Re: avrò deserialization fails when using kafka

2016-06-06 Thread hsy...@gmail.com
Hey Raja,

By default, the kafka input operator and the conversion operator will run
in different containers. You can set the stream locality to thread_local or
container_local. The input operator is io intensive and the your conversion
operator could be cpu intensive, correct me if I am wrong.
Another practice is you can extend the AbstractKafkaSinglePortInputOperator
and override the getTuple method
Here is an example:

public class AvroKafkaInputOperator extends
AbstractKafkaSinglePortInputOperator{

private String schemaRegURL;
private KafkaAvroDecoder decoder;

public AvroKafkaInputOperator(){

}

public AvroKafkaInputOperator(String schemaRegURL){
this.schemaRegURL = schemaRegURL;
}

/**
 * Setup call
 */
@Override
public void setup(OperatorContext context)
{
Properties props = new Properties();
props.setProperty("schema.registry.url", this.schemaRegURL);
this.decoder = new KafkaAvroDecoder(new VerifiableProperties(props));
}

public GenericRecord getTuple(Message msg)
{
   return decoder.fromBytes(msg.payload().toArray());
}


}




On Mon, Jun 6, 2016 at 11:54 AM, Raja.Aravapalli  wrote:

>
> After making the variable transient, it worked fine.
>
> Raja.
>
> From: "Raja.Aravapalli" 
> Date: Monday, June 6, 2016 at 1:52 PM
>
> To: "users@apex.apache.org" 
> Subject: Re: avrò deserialization fails when using kafka
>
>
> Thanks a lot Thomas & Ramanath.
>
> Your suggestions helped!! My issue is fixed. Thank you.
>
>
> Regards,
> Raja.
>
> From: Thomas Weise 
> Reply-To: "users@apex.apache.org" 
> Date: Monday, June 6, 2016 at 12:21 PM
> To: "users@apex.apache.org" 
> Subject: Re: avrò deserialization fails when using kafka
>
> Since you are creating the decoder in setup(), please mark the property
> transient. No need to checkpoint it.
>
> Thanks,
> Thomas
>
>
> On Mon, Jun 6, 2016 at 10:06 AM, Munagala Ramanath 
> wrote:
>
>>
>> http://docs.datatorrent.com/troubleshooting/#application-throwing-following-kryo-exception
>>
>> Please try the suggestions at the above link.
>>
>> It appears from
>>
>> https://github.com/uber/confluent-schema-registry/blob/master/avro-serializer/src/main/java/io/confluent/kafka/serializers/KafkaAvroDecoder.java
>> that the class does not have a default constructor.
>>
>> Ram
>>
>> On Mon, Jun 6, 2016 at 9:40 AM, Raja.Aravapalli <
>> raja.aravapa...@target.com> wrote:
>>
>>>
>>> Hi,
>>>
>>> I am trying to read data from kafka, and my input in kafka is avro
>>> messages.
>>>
>>> So I am using class “KafkaSinglePortByteArrayInputOperator” to emit
>>> records from kafka.. And in the next operator I am reading input as
>>> "byte[]” and deserializing the message!!
>>>
>>> But the tuple deserialization is failing with below error in the log…
>>>
>>> Can someone pls share your thoughts and help me fix this?
>>>
>>>
>>> Caused by: com.esotericsoftware.kryo.KryoException: Class cannot be created 
>>> (missing no-arg constructor): 
>>> io.confluent.kafka.serializers.KafkaAvroDecoder
>>> Serialization trace:
>>> decoder (com.tgt.mighty.apexapps.AvroBytesConversionOperator)
>>> at 
>>> com.esotericsoftware.kryo.Kryo$DefaultInstantiatorStrategy.newInstantiatorOf(Kryo.java:1228)
>>> at com.esotericsoftware.kryo.Kryo.newInstantiator(Kryo.java:1049)
>>> at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1058)
>>> at 
>>> com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:547)
>>> at 
>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:523)
>>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>>>
>>>
>>>
>>> *Code FYR:*
>>>
>>>
>>> *Application.java* file:
>>>
>>> public void populateDAG(DAG dag, Configuration conf)
>>> {
>>>   //KafkaSinglePortStringInputOperator kafkaInput =  
>>> dag.addOperator("Kafka_Input", KafkaSinglePortStringInputOperator.class);
>>>
>>>   KafkaSinglePortByteArrayInputOperator kafkaInput =  
>>> dag.addOperator("Kafka_Input", new KafkaSinglePortByteArrayInputOperator());
>>>
>>>   AvroBytesConversionOperator avroConversion = 
>>> dag.addOperator("Avro_Convert", new 
>>> AvroBytesConversionOperator(*“*schemaRegURL"));
>>>
>>>   HDFSWrite hdfs = dag.addOperator("To_HDFS", HDFSWrite.class);
>>>
>>>   //dag.addStream("Kafka_To_Hdfs_Ingestion", kafkaInput.outputPort, 
>>> hdfs.input);
>>>   dag.addStream("Kafka_Avro_Msg_Byte_Stream", kafkaInput.outputPort, 
>>> avroConversion.input);
>>>   dag.addStream("Avro_To_String_Stream", avroConversion.output, hdfs.input);
>>>
>>> }
>>>
>>>
>>> *Operator Code:*
>>>
>>> public class AvroBytesConversionOperator extends BaseOperator {
>>>
>>> private String schemaRegURL;
>>> private KafkaAvroDecoder decoder;
>>>
>>> public AvroBytesConversionOperator(){
>>>
>>> }
>>>
>>> public AvroBytesConversionOperator(String schemaRegURL){
>>> this.schemaRegURL = schemaRegURL;
>>> }
>>>
>>> /**
>>>  * Define

Re: kafka offset commit

2016-06-06 Thread hsy...@gmail.com
Hey Raja,

For 0.8, you have to implement OffsetManager interface on your own. The
updateOffsets will be called in application master every time when it get
updated offsets from each physical partition. And the offsets that you see
in the method is committed offset. So you can safely save these offsets
into either zookeeper(0.8.2 client has API to do that) or any other
datastore like DB or HDFS.  And also you have to implement the method
loadInitialOffsets to load back offset you want.

You are welcome to contribute a default implementation using buildin kafka
offset commit request API for OffsetManager!

Regards,
Siyuan

On Mon, Jun 6, 2016 at 3:36 PM, Raja.Aravapalli 
wrote:

>
> Hi Thomas,
>
> We are using 0.8 cluster still!!
>
>
> Regards,
> Raja.
>
> From: Thomas Weise 
> Reply-To: "users@apex.apache.org" 
> Date: Monday, June 6, 2016 at 5:23 PM
> To: "users@apex.apache.org" 
> Subject: Re: kafka offset commit
>
> Hi Raja,
>
> Which Kafka version are you using?
>
> With the new 0.9 connector there is no need for the offset manager:
>
>
> https://github.com/apache/apex-malhar/tree/master/kafka/src/main/java/org/apache/apex/malhar/kafka
>
> Thanks,
> Thomas
>
>
> On Mon, Jun 6, 2016 at 3:06 PM, Raja.Aravapalli <
> raja.aravapa...@target.com> wrote:
>
>> Hi
>>
>> Can someone please help me understand, where will the offsets be stored
>> when consuming with “*KafkaSinglePortStringInputOperator*”  ?
>>
>> And, how to handle restarts ?
>>
>>
>> I worked with Storm earlier, Storm maintains the offsets in zookeeper and
>> client id is maintained for every consumer, using which
>>
>> - we can see what is the current offset status for a given partition &
>> modify them as well using zookeeper-cli !!
>> - restarts can be handled
>>
>>
>> As per the Apex documentation, I can see, that using OffsetManager we can
>> handle the restarts effectively, but couldn’t find any examples to refer…
>>
>> How clientId can be used to retrieve offsets status
>> And ability to edit the offsets etc
>>
>> can someone pls help me find this ?
>>
>>
>> Thanks a lot!!
>>
>>
>> -Regards,
>> Raja.
>>
>>
>>
>>
>


Re: kafka offset commit

2016-06-06 Thread hsy...@gmail.com
Raja,

Not exactly, Apex actually stores offsets as part of the operator state,
And state of the operator are checkpointed internally and periodically( in
HDFS by default). For more details, you can read this
https://www.datatorrent.com/blog/blog-introduction-to-checkpoint/

With that said, offsets are stored in HDFS along with other state of the
operator so that it can recover in case of any system failure.
And also in Apex, you can do stateful restart (start the application by
specifying the previous application id). It will initialize all operators
and load the checkpointed state (offsets will be part of it) from HDFS and
continue run from that state.  The only limit is, you can not easy tell
where the current offsets are.  Hope this answered your question.

Regards,
Siyuan


On Mon, Jun 6, 2016 at 4:57 PM, Raja.Aravapalli 
wrote:

>
> Thanks Siyuan.
>
> So, to confirm, to apex is not storing offsets status at any location ?
> Like how Storm stores in Zookeeper ?
>
>
> Regards,
> Raja.
>
> From: "hsy...@gmail.com" 
> Reply-To: "users@apex.apache.org" 
> Date: Monday, June 6, 2016 at 6:42 PM
>
> To: "users@apex.apache.org" 
> Subject: Re: kafka offset commit
>
> Hey Raja,
>
> For 0.8, you have to implement OffsetManager interface on your own. The
> updateOffsets will be called in application master every time when it get
> updated offsets from each physical partition. And the offsets that you see
> in the method is committed offset. So you can safely save these offsets
> into either zookeeper(0.8.2 client has API to do that) or any other
> datastore like DB or HDFS.  And also you have to implement the method
> loadInitialOffsets to load back offset you want.
>
> You are welcome to contribute a default implementation using buildin kafka
> offset commit request API for OffsetManager!
>
> Regards,
> Siyuan
>
> On Mon, Jun 6, 2016 at 3:36 PM, Raja.Aravapalli <
> raja.aravapa...@target.com> wrote:
>
>>
>> Hi Thomas,
>>
>> We are using 0.8 cluster still!!
>>
>>
>> Regards,
>> Raja.
>>
>> From: Thomas Weise 
>> Reply-To: "users@apex.apache.org" 
>> Date: Monday, June 6, 2016 at 5:23 PM
>> To: "users@apex.apache.org" 
>> Subject: Re: kafka offset commit
>>
>> Hi Raja,
>>
>> Which Kafka version are you using?
>>
>> With the new 0.9 connector there is no need for the offset manager:
>>
>>
>> https://github.com/apache/apex-malhar/tree/master/kafka/src/main/java/org/apache/apex/malhar/kafka
>>
>> Thanks,
>> Thomas
>>
>>
>> On Mon, Jun 6, 2016 at 3:06 PM, Raja.Aravapalli <
>> raja.aravapa...@target.com> wrote:
>>
>>> Hi
>>>
>>> Can someone please help me understand, where will the offsets be stored
>>> when consuming with “*KafkaSinglePortStringInputOperator*”  ?
>>>
>>> And, how to handle restarts ?
>>>
>>>
>>> I worked with Storm earlier, Storm maintains the offsets in zookeeper
>>> and client id is maintained for every consumer, using which
>>>
>>> - we can see what is the current offset status for a given partition &
>>> modify them as well using zookeeper-cli !!
>>> - restarts can be handled
>>>
>>>
>>> As per the Apex documentation, I can see, that using OffsetManager we
>>> can handle the restarts effectively, but couldn’t find any examples to
>>> refer…
>>>
>>> How clientId can be used to retrieve offsets status
>>> And ability to edit the offsets etc
>>>
>>> can someone pls help me find this ?
>>>
>>>
>>> Thanks a lot!!
>>>
>>>
>>> -Regards,
>>> Raja.
>>>
>>>
>>>
>>>
>>
>


Re: kafka offset commit

2016-06-06 Thread hsy...@gmail.com
Hi Raja,

Yes, I think if you implement the interface and set it as input operator
property It should serve the purpose.

I don't think it would be a bottle neck since It is just a list data
structure of numbers and it only update every checkpoint interval.

Regards,
Siyuan

On Mon, Jun 6, 2016 at 5:43 PM, Raja.Aravapalli 
wrote:

>
>
> Thanks a lot Siyuan. It helped me understand better!!
>
>
> So, can you pls confirm, if I implement the offsetManager interface, it
> will be used to load initial starting position and update the offset
> status[at some interval] ?
>
> Will the application latency greatly decreases if I use HDFS for storage ?
>
> Thank you very much.
>
> Regards,
> Raja.
>
> From: "hsy...@gmail.com" 
> Reply-To: "users@apex.apache.org" 
> Date: Monday, June 6, 2016 at 7:13 PM
>
> To: "users@apex.apache.org" 
> Subject: Re: kafka offset commit
>
> Raja,
>
> Not exactly, Apex actually stores offsets as part of the operator state,
> And state of the operator are checkpointed internally and periodically( in
> HDFS by default). For more details, you can read this
> https://www.datatorrent.com/blog/blog-introduction-to-checkpoint/
>
> With that said, offsets are stored in HDFS along with other state of the
> operator so that it can recover in case of any system failure.
> And also in Apex, you can do stateful restart (start the application by
> specifying the previous application id). It will initialize all operators
> and load the checkpointed state (offsets will be part of it) from HDFS and
> continue run from that state.  The only limit is, you can not easy tell
> where the current offsets are.  Hope this answered your question.
>
> Regards,
> Siyuan
>
>
> On Mon, Jun 6, 2016 at 4:57 PM, Raja.Aravapalli <
> raja.aravapa...@target.com> wrote:
>
>>
>> Thanks Siyuan.
>>
>> So, to confirm, to apex is not storing offsets status at any location ?
>> Like how Storm stores in Zookeeper ?
>>
>>
>> Regards,
>> Raja.
>>
>> From: "hsy...@gmail.com" 
>> Reply-To: "users@apex.apache.org" 
>> Date: Monday, June 6, 2016 at 6:42 PM
>>
>> To: "users@apex.apache.org" 
>> Subject: Re: kafka offset commit
>>
>> Hey Raja,
>>
>> For 0.8, you have to implement OffsetManager interface on your own. The
>> updateOffsets will be called in application master every time when it get
>> updated offsets from each physical partition. And the offsets that you see
>> in the method is committed offset. So you can safely save these offsets
>> into either zookeeper(0.8.2 client has API to do that) or any other
>> datastore like DB or HDFS.  And also you have to implement the method
>> loadInitialOffsets to load back offset you want.
>>
>> You are welcome to contribute a default implementation using buildin
>> kafka offset commit request API for OffsetManager!
>>
>> Regards,
>> Siyuan
>>
>> On Mon, Jun 6, 2016 at 3:36 PM, Raja.Aravapalli <
>> raja.aravapa...@target.com> wrote:
>>
>>>
>>> Hi Thomas,
>>>
>>> We are using 0.8 cluster still!!
>>>
>>>
>>> Regards,
>>> Raja.
>>>
>>> From: Thomas Weise 
>>> Reply-To: "users@apex.apache.org" 
>>> Date: Monday, June 6, 2016 at 5:23 PM
>>> To: "users@apex.apache.org" 
>>> Subject: Re: kafka offset commit
>>>
>>> Hi Raja,
>>>
>>> Which Kafka version are you using?
>>>
>>> With the new 0.9 connector there is no need for the offset manager:
>>>
>>>
>>> https://github.com/apache/apex-malhar/tree/master/kafka/src/main/java/org/apache/apex/malhar/kafka
>>>
>>> Thanks,
>>> Thomas
>>>
>>>
>>> On Mon, Jun 6, 2016 at 3:06 PM, Raja.Aravapalli <
>>> raja.aravapa...@target.com> wrote:
>>>
>>>> Hi
>>>>
>>>> Can someone please help me understand, where will the offsets be stored
>>>> when consuming with “*KafkaSinglePortStringInputOperator*”  ?
>>>>
>>>> And, how to handle restarts ?
>>>>
>>>>
>>>> I worked with Storm earlier, Storm maintains the offsets in zookeeper
>>>> and client id is maintained for every consumer, using which
>>>>
>>>> - we can see what is the current offset status for a given partition &
>>>> modify them as well using zookeeper-cli !!
>>>> - restarts can be handled
>>>>
>>>>
>>>> As per the Apex documentation, I can see, that using OffsetManager we
>>>> can handle the restarts effectively, but couldn’t find any examples to
>>>> refer…
>>>>
>>>> How clientId can be used to retrieve offsets status
>>>> And ability to edit the offsets etc
>>>>
>>>> can someone pls help me find this ?
>>>>
>>>>
>>>> Thanks a lot!!
>>>>
>>>>
>>>> -Regards,
>>>> Raja.
>>>>
>>>>
>>>>
>>>>
>>>
>>
>


Re: Kafka 0.9 operator to start consuming from a particular offset

2016-06-10 Thread hsy...@gmail.com
Hi Ananth,
Unlike files, Kafka is usually for streaming cases. Correct me if I'm
wrong, your use case seems like a batch processing. We didn't consider end
offset in our Kafka input operator design. But it could be a useful
feature. Unfortunately there is no easy way, as of I know, to extend
existing operator to achieve that.

OffsetManager is not designed for end offset. It's only
a  customizable callback to update the committed offsets. And the start
offsets it loads are supposed for stateful application restart.

Can you create a ticket and elaborate your use case there? Thanks!

Regards,
Siyuan




On Friday, June 10, 2016, Ananth Gundabattula 
wrote:

> Hello All,
>
> I was wondering what would be the community's thoughts on the following ?
>
> We are using kafka 0.9 input operator to read from a few topics. We are
> using this stream to generate a parquet file. Now this approach is all good
> for a beginners use case. At a later point in time, we would like to
> "merge" all of the parquet files previously generated and for this I would
> like to reprocess data exactly from a particular offset inside each of the
> partitions. Each of the partitions will have their own starting and ending
> offsets that I need to process for.
>
> I was wondering if there is an easy way to extend the Kafka 0.9 operator (
> perhaps along the lines of the offset manager in the 0.8 versions of the
> kafka operator ) . Thoughts please ?
>
> Regards,
> Ananth
>


Questions about dag validation

2016-07-06 Thread hsy...@gmail.com
Hey guys,

I've a problem using jdbc output operator. It is same as
http://stackoverflow.com/questions/37887390/datatorrent-jdbcoperator-not-workiing

After trouble shooting, I found there might be some validation issue here
that prerun validation for dag won't work in some special case

Here is an example, let's say we have an Operator

class MyOperator implements Operator {

   B b;

}

class MyBean {
 @NotNull
 cantBeNullProp

 setCan...

 getCan...

...
}

If user forget to set the cantBeNullProp Property, prerun validation won't
pick it.

Is it a limitation of javax validation or we can do better to support this
kind of validation?

Regards,
Siyuan


Re: DataTorrent with SBT: .apa file not created

2016-07-11 Thread hsy...@gmail.com
I've never used SBT to build Apex application. But I guess you can try 2
things here
Use the sbt maven plugin
https://github.com/shivawu/sbt-maven-plugin
or use sbt assembly plugin
https://github.com/sbt/sbt-assembly

In the 2nd way, you need to translate the plugin configuration part in
pom.xml to sbt scripts.
The configuration usually look like this

I wish this helps


  maven-assembly-plugin
  

  app-package-assembly
  package
  
single
  
  
${project.artifactId}-${project.version}-apexapp
false

  src/assemble/appPackage.xml


  0755


  
${apex.apppackage.classpath}
${apex.core.version}

${apex.apppackage.groupid}
${project.artifactId}
${project.version}

${project.name}

${project.description}
  

  

  



On Mon, Jul 11, 2016 at 2:01 PM, Ankit Sarraf 
wrote:

> I am using SBT to create a DataTorrent Application. The project comprises
> of 2 parts. Part 1 is a Random Kafka Generator built using Scala. Part 2 is
> the DataTorrent Application (Java) to ingest data, process it, and write to
> HDFS.
>
> There are no errors while doing sbt assembly.
>
> Although, Uber JAR is created successfully, .apa file is not created. So
> does DataTorrent work with SBT?
>
> Thanks
> Ankit.
>


Re: A proposal for Malhar

2016-07-12 Thread hsy...@gmail.com
+1

On Tue, Jul 12, 2016 at 11:53 AM, David Yan  wrote:

> Hi all,
>
> I would like to renew the discussion of retiring operators in Malhar.
>
> As stated before, the reason why we would like to retire operators in
> Malhar is because some of them were written a long time ago before Apache
> incubation, and they do not pertain to real use cases, are not up to par in
> code quality, have no potential for improvement, and probably completely
> unused by anybody.
>
> We do not want contributors to use them as a model of their contribution,
> or users to use them thinking they are of quality, and then hit a wall.
> Both scenarios are not beneficial to the reputation of Apex.
>
> The initial 3 packages that we would like to target are *lib/algo*,
> *lib/math*, and *lib/streamquery*.
>
> I'm adding this thread to the users list. Please speak up if you are using
> any operator in these 3 packages. We would like to hear from you.
>
> These are the options I can think of for retiring those operators:
>
> 1) Completely remove them from the malhar repository.
> 2) Move them from malhar-library into a separate artifact called
> malhar-misc
> 3) Mark them deprecated and add to their javadoc that they are no longer
> supported
>
> Note that 2 and 3 are not mutually exclusive. Any thoughts?
>
> David
>
> On Tue, Jun 7, 2016 at 2:27 PM, Pramod Immaneni 
> wrote:
>
>> I wanted to close the loop on this discussion. In general everyone seemed
>> to be favorable to this idea with no serious objections. Folks had good
>> suggestions like documenting capabilities of operators, come up well
>> defined criteria for graduation of operators and what those criteria may
>> be
>> and what to do with existing operators that may not yet be mature or
>> unused.
>>
>> I am going to summarize the key points that resulted from the discussion
>> and would like to proceed with them.
>>
>>- Operators that do not yet provide the key platform capabilities to
>>make an operator useful across different applications such as
>> reusability,
>>partitioning static or dynamic, idempotency, exactly once will still be
>>accepted as long as they are functionally correct, have unit tests and
>> will
>>go into a separate module.
>>- Contrib module was suggested as a place where new contributions go in
>>that don't yet have all the platform capabilities and are not yet
>> mature.
>>If there are no other suggestions we will go with this one.
>>- It was suggested the operators documentation list those platform
>>capabilities it currently provides from the list above. I will
>> document a
>>structure for this in the contribution guidelines.
>>- Folks wanted to know what would be the criteria to graduate an
>>operator to the big leagues :). I will kick-off a separate thread for
>> it as
>>I think it requires its own discussion and hopefully we can come up
>> with a
>>set of guidelines for it.
>>- David brought up state of some of the existing operators and their
>>retirement and the layout of operators in Malhar in general and how it
>>causes problems with development. I will ask him to lead the
>> discussion on
>>that.
>>
>> Thanks
>>
>> On Fri, May 27, 2016 at 7:47 PM, David Yan  wrote:
>>
>> > The two ideas are not conflicting, but rather complementing.
>> >
>> > On the contrary, putting a new process for people trying to contribute
>> > while NOT addressing the old unused subpar operators in the repository
>> is
>> > what is conflicting.
>> >
>> > Keep in mind that when people try to contribute, they always look at the
>> > existing operators already in the repository as examples and likely a
>> model
>> > for their new operators.
>> >
>> > David
>> >
>> >
>> > On Fri, May 27, 2016 at 4:05 PM, Amol Kekre 
>> wrote:
>> >
>> > > Yes there are two conflicting threads now. The original thread was to
>> > open
>> > > up a way for contributors to submit code in a dir (contrib?) as long
>> as
>> > > license part of taken care of.
>> > >
>> > > On the thread of removing non-used operators -> How do we know what is
>> > > being used?
>> > >
>> > > Thks,
>> > > Amol
>> > >
>> > >
>> > > On Fri, May 27, 2016 at 3:40 PM, Sandesh Hegde <
>> sand...@datatorrent.com>
>> > > wrote:
>> > >
>> > > > +1 for removing the not-used operators.
>> > > >
>> > > > So we are creating a process for operator writers who don't want to
>> > > > understand the platform, yet wants to contribute? How big is that
>> set?
>> > > > If we tell the app-user, here is the code which has not passed all
>> the
>> > > > checklist, will they be ready to use that in production?
>> > > >
>> > > > This thread has 2 conflicting forces, reduce the operators and make
>> it
>> > > easy
>> > > > to add more operators.
>> > > >
>> > > >
>> > > >
>> > > > On Fri, May 27, 2016 at 3:03 PM Pramod Immaneni <
>> > pra...@datatorrent.com>
>> > > > wrote:
>> > > >
>> > > > > On Fri, May 27, 2016 at 2:30 PM, Gaurav Gupta <
>> > > gaurav.gopi...@

Re: A proposal for Malhar

2016-07-12 Thread hsy...@gmail.com
Why not have a shared google sheet with a list of operators and options
that we want to do with it.
I think it's case by case.
But retire unused or obsolete operators is important and we should do it
sooner rather than later.

Regards,
Siyuan

On Tue, Jul 12, 2016 at 1:09 PM, Amol Kekre  wrote:

>
> My vote is to do 2&3
>
> Thks
> Amol
>
>
> On Tue, Jul 12, 2016 at 12:14 PM, Kottapalli, Venkatesh <
> vkottapa...@directv.com> wrote:
>
>> +1 for deprecating the packages listed below.
>>
>> -Original Message-
>> From: hsy...@gmail.com [mailto:hsy...@gmail.com]
>> Sent: Tuesday, July 12, 2016 12:01 PM
>>
>> +1
>>
>> On Tue, Jul 12, 2016 at 11:53 AM, David Yan 
>> wrote:
>>
>> > Hi all,
>> >
>> > I would like to renew the discussion of retiring operators in Malhar.
>> >
>> > As stated before, the reason why we would like to retire operators in
>> > Malhar is because some of them were written a long time ago before
>> > Apache incubation, and they do not pertain to real use cases, are not
>> > up to par in code quality, have no potential for improvement, and
>> > probably completely unused by anybody.
>> >
>> > We do not want contributors to use them as a model of their
>> > contribution, or users to use them thinking they are of quality, and
>> then hit a wall.
>> > Both scenarios are not beneficial to the reputation of Apex.
>> >
>> > The initial 3 packages that we would like to target are *lib/algo*,
>> > *lib/math*, and *lib/streamquery*.
>>
>> >
>> > I'm adding this thread to the users list. Please speak up if you are
>> > using any operator in these 3 packages. We would like to hear from you.
>> >
>> > These are the options I can think of for retiring those operators:
>> >
>> > 1) Completely remove them from the malhar repository.
>> > 2) Move them from malhar-library into a separate artifact called
>> > malhar-misc
>> > 3) Mark them deprecated and add to their javadoc that they are no
>> > longer supported
>> >
>> > Note that 2 and 3 are not mutually exclusive. Any thoughts?
>> >
>> > David
>> >
>> > On Tue, Jun 7, 2016 at 2:27 PM, Pramod Immaneni
>> > 
>> > wrote:
>> >
>> >> I wanted to close the loop on this discussion. In general everyone
>> >> seemed to be favorable to this idea with no serious objections. Folks
>> >> had good suggestions like documenting capabilities of operators, come
>> >> up well defined criteria for graduation of operators and what those
>> >> criteria may be and what to do with existing operators that may not
>> >> yet be mature or unused.
>> >>
>> >> I am going to summarize the key points that resulted from the
>> >> discussion and would like to proceed with them.
>> >>
>> >>- Operators that do not yet provide the key platform capabilities to
>> >>make an operator useful across different applications such as
>> >> reusability,
>> >>partitioning static or dynamic, idempotency, exactly once will
>> still be
>> >>accepted as long as they are functionally correct, have unit tests
>> >> and will
>> >>go into a separate module.
>> >>- Contrib module was suggested as a place where new contributions
>> go in
>> >>that don't yet have all the platform capabilities and are not yet
>> >> mature.
>> >>If there are no other suggestions we will go with this one.
>> >>- It was suggested the operators documentation list those platform
>> >>capabilities it currently provides from the list above. I will
>> >> document a
>> >>structure for this in the contribution guidelines.
>> >>- Folks wanted to know what would be the criteria to graduate an
>> >>operator to the big leagues :). I will kick-off a separate thread
>> >> for it as
>> >>I think it requires its own discussion and hopefully we can come
>> >> up with a
>> >>set of guidelines for it.
>> >>- David brought up state of some of the existing operators and their
>> >>retirement and the layout of operators in Malhar in general and how
>> it
>> >>causes problems with development. I will ask him to lead the
>> >> discussion on
>> >>that.
>> >>
>> >>

Re: Regarding using Scala to develop Apex app.

2016-07-15 Thread hsy...@gmail.com
Akshay and Ankit,

Scala support is definitely in our roadmap.
Akshay,
The exception you see might be related to some Scala function that compiled
to java anonymous class.  There are 2 possible workarounds (I haven't tried
but possibly they would work).
1. You can try move your functional expression(A => B) to a
well-defined class.
2. Use some third-party kryo serializer for scala classes(
https://github.com/twitter/chill)  and annotate on that functional field(
@Bind(SomeScalaSerializer.class)
 Please let us know if any of them works :)

Ankit,
Resolve dependency and compile the code should not be the problem, the
problem is the assembly plugin seems work different from the plugin in
maven and we need to work on that part.  You can also try gradle which has
more features and support both scala and maven very well. You are very
welcome to lead and contribute to the community!  :)

Regards,
Siyuan


On Thu, Jul 14, 2016 at 11:14 PM, Ankit Sarraf 
wrote:

> Interesting. Thanks for sharing the link. I surly feel that if DataTorrent
> has to expand its roots, it will have to evolve to support as many
> languages seamlessly as possible. So it seems to have complete support for
> Scala in future.
>
> Also, curious to know if people at DT are thinking of allowing SBT
> application build with the apps?
>
> On Jul 14, 2016 11:07 PM, "Akshay S Harale" 
> wrote:
>
> Hello,
>
> I found one blog post on writing apex app in scala
> .
> First I tried simple app it worked very well but when I introduced some
> anonymous functions in program, it started throwing kryo serialisation
> exception:
> *com.esotericsoftware.kryo.KryoException: Class cannot be created (missing
> no-arg constructor): com.sample.Aggregator$$anon$1*
>
> My question is : Will Apex have full support for scala in future ?
>
> Regards,
> Akshay S. Harale
> Software Developer @ Synerzip
> Skype – akshayharale
>
> This e-mail, including any attached files, may contain confidential and
> privileged information for the sole use of the intended recipient. Any
> review, use, distribution, or disclosure by others is strictly prohibited.
> If you are not the intended recipient (or authorized to receive information
> for the intended recipient), please contact the sender by reply e-mail and
> delete all copies of this message.
>
>
>


Re: Regarding using Scala to develop Apex app.

2016-07-15 Thread hsy...@gmail.com
BTW Akshay, if you are using anonymous function as a field in the operator,
it's very likely that your function is stateless?  If that's the case, you
can try to mark it transient (
http://www.scala-lang.org/api/rc2/scala/transient.html)


On Thu, Jul 14, 2016 at 11:06 PM, Akshay S Harale <
akshay.har...@synerzip.com> wrote:

> Hello,
>
> I found one blog post on writing apex app in scala
> .
> First I tried simple app it worked very well but when I introduced some
> anonymous functions in program, it started throwing kryo serialisation
> exception:
> *com.esotericsoftware.kryo.KryoException: Class cannot be created (missing
> no-arg constructor): com.sample.Aggregator$$anon$1*
>
> My question is : Will Apex have full support for scala in future ?
>
> Regards,
> Akshay S. Harale
> Software Developer @ Synerzip
> Skype – akshayharale
>
> This e-mail, including any attached files, may contain confidential and
> privileged information for the sole use of the intended recipient. Any
> review, use, distribution, or disclosure by others is strictly prohibited.
> If you are not the intended recipient (or authorized to receive information
> for the intended recipient), please contact the sender by reply e-mail and
> delete all copies of this message.
>
>


Re: Force Fail Application

2016-07-18 Thread hsy...@gmail.com
He Michael,

You can throw a ShutdownException.

Siyuan

On Mon, Jul 18, 2016 at 10:06 AM, Silver, Michael <
michael.sil...@capitalone.com> wrote:

>
>
>
>
> Hello,
>
>
>
> I am looking for a solution to force shutdown or fail my application. I
> have an operator that checks that a file (which is needed for the
> application to run) is present during setup. If the file is not present I
> want the entire application to fail. How would I do this in apex?
>
>
>
> Thank you,
>
>
>
> Michael
>
> --
>
> The information contained in this e-mail is confidential and/or
> proprietary to Capital One and/or its affiliates and may only be used
> solely in performance of work or services for Capital One. The information
> transmitted herewith is intended only for use by the individual or entity
> to which it is addressed. If the reader of this message is not the intended
> recipient, you are hereby notified that any review, retransmission,
> dissemination, distribution, copying or other use of, or taking of any
> action in reliance upon this information is strictly prohibited. If you
> have received this communication in error, please contact the sender and
> delete the material from your computer.
>


Re: can operators emit on a different from the operator itself thread?

2016-08-10 Thread hsy...@gmail.com
Hey Vlad,

Thanks for bringing this up. Is there an easy way to detect unexpected use
of emit method without hurt the performance. Or at least if we can detect
this in debug mode.

Regards,
Siyuan

On Wed, Aug 10, 2016 at 11:27 AM, Vlad Rozov 
wrote:

> The short answer is no, creating worker thread to emit tuples is not
> supported by Apex and will lead to an undefined behavior. Operators in Apex
> have strong thread affinity and all interaction with the platform must
> happen on the operator thread.
>
> Vlad
>


Re: Malhar 0.8.1.1 Kafka Operator and chroot

2016-08-24 Thread hsy...@gmail.com
Hey McCullough,

What malhar version do you use?

Regards,
Siyuan

On Wed, Aug 24, 2016 at 9:07 AM, McCullough, Alex <
alex.mccullo...@capitalone.com> wrote:

> Hey All,
>
>
>
> We are using the 0.8.1 kafka operator and the ZK connection string has a
> chroot on it. We get errors when launching and the app fails, is there a
> proper way in apex to append a chroot?
>
>
>
>
>
> **the ip’s are masked with #, but that’s not how they appear in our code
> obviously**
>
>
>
> When we add this to the property for ZK:
>
>
>
>
>
> 
>
>dt.operator.kafkaInputOperator.prop.consumer.zookeeper
>
>10.##.##.#:2181,10.##.##.##:2181,10.##.##.##:2181,10.
> ##.##.#:2181/kafka2
>
> 
>
>
>
>
>
>
>
> We get this error (connecting to a cluster without chroot it works fine):
>
>
>
>
>
> 2016-08-24 11:55:13,448 [main-EventThread] INFO  zkclient.ZkClient
> processStateChanged - zookeeper state changed (SyncConnected)
>
> 2016-08-24 11:55:13,585 [ZkClient-EventThread-30-10.##
> .##.#:2181,10.##.##.##:2181,10.##.##.#:2181/kafka2,10.##.##.##:2181]
> INFO  zkclient.ZkEventThread run - Terminate ZkClient event thread.
>
> 2016-08-24 11:55:13,596 [main] INFO  zookeeper.ZooKeeper close - Session:
> 0x4558654aacf4263 closed
>
> 2016-08-24 11:55:13,596 [main-EventThread] INFO  zookeeper.ClientCnxn run
> - EventThread shut down
>
> 2016-08-24 11:55:13,597 [main] INFO  kafka.AbstractKafkaInputOperator
> definePartitions - [ONE_TO_ONE]: Initializing partition(s)
>
> 2016-08-24 11:55:13,602 [main] INFO  service.AbstractService noteFailure -
> Service com.datatorrent.stram.StreamingAppMasterService failed in state
> INITED; cause: java.lang.IllegalArgumentException: there has to be one
> idempotent storage manager
>
> java.lang.IllegalArgumentException: there has to be one idempotent
> storage manager
>
> at com.google.common.base.Preconditions.checkArgument(
> Preconditions.java:93)
>
> at org.apache.apex.malhar.lib.wal.FSWindowDataManager.partitioned(
> FSWindowDataManager.java:251)
>
> at com.datatorrent.contrib.kafka.AbstractKafkaInputOperator.
> definePartitions(AbstractKafkaInputOperator.java:637)
>
> at com.datatorrent.stram.plan.physical.PhysicalPlan.
> initPartitioning(PhysicalPlan.java:752)
>
> at com.datatorrent.stram.plan.physical.PhysicalPlan.
> addLogicalOperator(PhysicalPlan.java:1676)
>
> at com.datatorrent.stram.plan.physical.PhysicalPlan.(
> PhysicalPlan.java:378)
>
> at com.datatorrent.stram.StreamingContainerManager.(
> StreamingContainerManager.java:418)
>
> at com.datatorrent.stram.StreamingContainerManager.getInstance(
> StreamingContainerManager.java:3023)
>
> at com.datatorrent.stram.StreamingAppMasterService.serviceInit(
> StreamingAppMasterService.java:551)
>
> at org.apache.hadoop.service.AbstractService.init(
> AbstractService.java:163)
>
> at com.datatorrent.stram.StreamingAppMaster.main(
> StreamingAppMaster.java:102)
>
> 2016-08-24 11:55:13,604 [main] WARN  service.AbstractService stopQuietly -
> When stopping the service com.datatorrent.stram.StreamingAppMasterService
> : java.lang.NullPointerException
>
> java.lang.NullPointerException
>
> at com.datatorrent.stram.StreamingAppMasterService.serviceStop(
> StreamingAppMasterService.java:629)
>
> at org.apache.hadoop.service.AbstractService.stop(
> AbstractService.java:221)
>
> at org.apache.hadoop.service.ServiceOperations.stop(
> ServiceOperations.java:52)
>
> at org.apache.hadoop.service.ServiceOperations.stopQuietly(
> ServiceOperations.java:80)
>
> at org.apache.hadoop.service.AbstractService.init(
> AbstractService.java:171)
>
> at com.datatorrent.stram.StreamingAppMaster.main(
> StreamingAppMaster.java:102)
>
> 2016-08-24 11:55:13,605 [main] ERROR stram.StreamingAppMaster main -
> Exiting Application Master
>
> java.lang.IllegalArgumentException: there has to be one idempotent
> storage manager
>
> at com.google.common.base.Preconditions.checkArgument(
> Preconditions.java:93)
>
> at org.apache.apex.malhar.lib.wal.FSWindowDataManager.partitioned(
> FSWindowDataManager.java:251)
>
> at com.datatorrent.contrib.kafka.AbstractKafkaInputOperator.
> definePartitions(AbstractKafkaInputOperator.java:637)
>
> at com.datatorrent.stram.plan.physical.PhysicalPlan.
> initPartitioning(PhysicalPlan.java:752)
>
> at com.datatorrent.stram.plan.physical.PhysicalPlan.
> addLogicalOperator(PhysicalPlan.java:1676)
>
> at com.datatorrent.stram.plan.physical.PhysicalPlan.(
> PhysicalPlan.java:378)
>
> at com.datatorrent.stram.StreamingContainerManager.(
> StreamingContainerManager.java:418)
>
> at com.datatorrent.stram.StreamingContainerManager.getInstance(
> StreamingContainerManager.java:3023)
>
> at com.datatorrent.stram.StreamingAppMasterService.serviceInit(
> StreamingAppMasterService.java:551)
>
> at org.apache.hadoop.service.AbstractService.init(
> AbstractService.java:163)
>
> at com.datatorrent.stram.StreamingAppMaster.main(
> StreamingAppMaster.ja

Re: Malhar 0.8.1.1 Kafka Operator and chroot

2016-08-24 Thread hsy...@gmail.com
Hey Alex,

Do you use ONE_TO_ONE or ONE_TO_MANY partition?

Regards,
Siyuan

On Wed, Aug 24, 2016 at 10:27 AM, McCullough, Alex <
alex.mccullo...@capitalone.com> wrote:

> Hey Siyuan,
>
>
>
> We are using 3.4.0
>
>
>
> Thanks,
>
> Alex
>
> *From: *"hsy...@gmail.com" 
> *Reply-To: *"users@apex.apache.org" 
> *Date: *Wednesday, August 24, 2016 at 12:47 PM
> *To: *"users@apex.apache.org" 
> *Subject: *Re: Malhar 0.8.1.1 Kafka Operator and chroot
>
>
>
> Hey McCullough,
>
>
>
> What malhar version do you use?
>
>
>
> Regards,
>
> Siyuan
>
>
>
> On Wed, Aug 24, 2016 at 9:07 AM, McCullough, Alex <
> alex.mccullo...@capitalone.com> wrote:
>
> Hey All,
>
>
>
> We are using the 0.8.1 kafka operator and the ZK connection string has a
> chroot on it. We get errors when launching and the app fails, is there a
> proper way in apex to append a chroot?
>
>
>
>
>
> **the ip’s are masked with #, but that’s not how they appear in our code
> obviously**
>
>
>
> When we add this to the property for ZK:
>
>
>
>
>
> 
>
>dt.operator.kafkaInputOperator.prop.consumer.zookeeper
>
>10.##.##.#:2181,10.##.##.##:2181,10.##.##.##:2181,10.
> ##.##.#:2181/kafka2
>
> 
>
>
>
>
>
>
>
> We get this error (connecting to a cluster without chroot it works fine):
>
>
>
>
>
> 2016-08-24 11:55:13,448 [main-EventThread] INFO  zkclient.ZkClient
> processStateChanged - zookeeper state changed (SyncConnected)
>
> 2016-08-24 11:55:13,585 [ZkClient-EventThread-30-10.##
> .##.#:2181,10.##.##.##:2181,10.##.##.#:2181/kafka2,10.##.##.##:2181]
> INFO  zkclient.ZkEventThread run - Terminate ZkClient event thread.
>
> 2016-08-24 11:55:13,596 [main] INFO  zookeeper.ZooKeeper close - Session:
> 0x4558654aacf4263 closed
>
> 2016-08-24 11:55:13,596 [main-EventThread] INFO  zookeeper.ClientCnxn run
> - EventThread shut down
>
> 2016-08-24 11:55:13,597 [main] INFO  kafka.AbstractKafkaInputOperator
> definePartitions - [ONE_TO_ONE]: Initializing partition(s)
>
> 2016-08-24 11:55:13,602 [main] INFO  service.AbstractService noteFailure -
> Service com.datatorrent.stram.StreamingAppMasterService failed in state
> INITED; cause: java.lang.IllegalArgumentException: there has to be one
> idempotent storage manager
>
> java.lang.IllegalArgumentException: there has to be one idempotent
> storage manager
>
> at com.google.common.base.Preconditions.checkArgument(
> Preconditions.java:93)
>
> at org.apache.apex.malhar.lib.wal.FSWindowDataManager.partitioned(
> FSWindowDataManager.java:251)
>
> at com.datatorrent.contrib.kafka.AbstractKafkaInputOperator.
> definePartitions(AbstractKafkaInputOperator.java:637)
>
> at com.datatorrent.stram.plan.physical.PhysicalPlan.
> initPartitioning(PhysicalPlan.java:752)
>
> at com.datatorrent.stram.plan.physical.PhysicalPlan.
> addLogicalOperator(PhysicalPlan.java:1676)
>
> at com.datatorrent.stram.plan.physical.PhysicalPlan.(
> PhysicalPlan.java:378)
>
> at com.datatorrent.stram.StreamingContainerManager.(
> StreamingContainerManager.java:418)
>
> at com.datatorrent.stram.StreamingContainerManager.getInstance(
> StreamingContainerManager.java:3023)
>
> at com.datatorrent.stram.StreamingAppMasterService.serviceInit(
> StreamingAppMasterService.java:551)
>
> at org.apache.hadoop.service.AbstractService.init(
> AbstractService.java:163)
>
> at com.datatorrent.stram.StreamingAppMaster.main(
> StreamingAppMaster.java:102)
>
> 2016-08-24 11:55:13,604 [main] WARN  service.AbstractService stopQuietly -
> When stopping the service com.datatorrent.stram.StreamingAppMasterService
> : java.lang.NullPointerException
>
> java.lang.NullPointerException
>
> at com.datatorrent.stram.StreamingAppMasterService.serviceStop(
> StreamingAppMasterService.java:629)
>
> at org.apache.hadoop.service.AbstractService.stop(
> AbstractService.java:221)
>
> at org.apache.hadoop.service.ServiceOperations.stop(
> ServiceOperations.java:52)
>
> at org.apache.hadoop.service.ServiceOperations.stopQuietly(
> ServiceOperations.java:80)
>
> at org.apache.hadoop.service.AbstractService.init(
> AbstractService.java:171)
>
> at com.datatorrent.stram.StreamingAppMaster.main(
> StreamingAppMaster.java:102)
>
> 2016-08-24 11:55:13,605 [main] ERROR stram.StreamingAppMaster main -
> Exiting Application Master
>
> java.lang.IllegalArgumentException: there has to be one idempotent
> storage manager
>
> at com.google.common.base.Preconditions.checkArgument(
&g

Re: Malhar 0.8.1.1 Kafka Operator and chroot

2016-08-24 Thread hsy...@gmail.com
Hey Alex,

Yeah, I think there is a bug for multitenant kafka support in the code. I
have created a ticket
https://issues.apache.org/jira/browse/APEXMALHAR-2199

For now can you try one thing:
Can you try to set your zookeeper to something like this:



   dt.operator.kafkaInputOperator.prop.consumer.zookeeper

   10.##.##.#:2181/kafka2,10.##.#
#.##:2181/kafka2,10.##.##.##:2181/kafka2,10.##.##.#:2181/kafka2




or you can just try to set just one of the zookeeper nodes.

For kafka client it only needs to know one running node but you'll lose
zookeeper HA


Regards,

Siyuan

On Wed, Aug 24, 2016 at 10:40 AM, McCullough, Alex <
alex.mccullo...@capitalone.com> wrote:

> ONE_TO_ONE
>
>
>
>
>
>
>
> *From: *"hsy...@gmail.com" 
> *Reply-To: *"users@apex.apache.org" 
> *Date: *Wednesday, August 24, 2016 at 1:38 PM
>
> *To: *"users@apex.apache.org" 
> *Subject: *Re: Malhar 0.8.1.1 Kafka Operator and chroot
>
>
>
> Hey Alex,
>
>
>
> Do you use ONE_TO_ONE or ONE_TO_MANY partition?
>
>
>
> Regards,
>
> Siyuan
>
>
>
> On Wed, Aug 24, 2016 at 10:27 AM, McCullough, Alex <
> alex.mccullo...@capitalone.com> wrote:
>
> Hey Siyuan,
>
>
>
> We are using 3.4.0
>
>
>
> Thanks,
>
> Alex
>
> *From: *"hsy...@gmail.com" 
> *Reply-To: *"users@apex.apache.org" 
> *Date: *Wednesday, August 24, 2016 at 12:47 PM
> *To: *"users@apex.apache.org" 
> *Subject: *Re: Malhar 0.8.1.1 Kafka Operator and chroot
>
>
>
> Hey McCullough,
>
>
>
> What malhar version do you use?
>
>
>
> Regards,
>
> Siyuan
>
>
>
> On Wed, Aug 24, 2016 at 9:07 AM, McCullough, Alex <
> alex.mccullo...@capitalone.com> wrote:
>
> Hey All,
>
>
>
> We are using the 0.8.1 kafka operator and the ZK connection string has a
> chroot on it. We get errors when launching and the app fails, is there a
> proper way in apex to append a chroot?
>
>
>
>
>
> **the ip’s are masked with #, but that’s not how they appear in our code
> obviously**
>
>
>
> When we add this to the property for ZK:
>
>
>
>
>
> 
>
>dt.operator.kafkaInputOperator.prop.consumer.zookeeper
>
>10.##.##.#:2181,10.##.##.##:2181,10.##.##.##:2181,10.
> ##.##.#:2181/kafka2
>
> 
>
>
>
>
>
>
>
> We get this error (connecting to a cluster without chroot it works fine):
>
>
>
>
>
> 2016-08-24 11:55:13,448 [main-EventThread] INFO  zkclient.ZkClient
> processStateChanged - zookeeper state changed (SyncConnected)
>
> 2016-08-24 11:55:13,585 [ZkClient-EventThread-30-10.##
> .##.#:2181,10.##.##.##:2181,10.##.##.#:2181/kafka2,10.##.##.##:2181]
> INFO  zkclient.ZkEventThread run - Terminate ZkClient event thread.
>
> 2016-08-24 11:55:13,596 [main] INFO  zookeeper.ZooKeeper close - Session:
> 0x4558654aacf4263 closed
>
> 2016-08-24 11:55:13,596 [main-EventThread] INFO  zookeeper.ClientCnxn run
> - EventThread shut down
>
> 2016-08-24 11:55:13,597 [main] INFO  kafka.AbstractKafkaInputOperator
> definePartitions - [ONE_TO_ONE]: Initializing partition(s)
>
> 2016-08-24 11:55:13,602 [main] INFO  service.AbstractService noteFailure -
> Service com.datatorrent.stram.StreamingAppMasterService failed in state
> INITED; cause: java.lang.IllegalArgumentException: there has to be one
> idempotent storage manager
>
> java.lang.IllegalArgumentException: there has to be one idempotent
> storage manager
>
> at com.google.common.base.Preconditions.checkArgument(
> Preconditions.java:93)
>
> at org.apache.apex.malhar.lib.wal.FSWindowDataManager.partitioned(
> FSWindowDataManager.java:251)
>
> at com.datatorrent.contrib.kafka.AbstractKafkaInputOperator.
> definePartitions(AbstractKafkaInputOperator.java:637)
>
> at com.datatorrent.stram.plan.physical.PhysicalPlan.
> initPartitioning(PhysicalPlan.java:752)
>
> at com.datatorrent.stram.plan.physical.PhysicalPlan.
> addLogicalOperator(PhysicalPlan.java:1676)
>
> at com.datatorrent.stram.plan.physical.PhysicalPlan.(
> PhysicalPlan.java:378)
>
> at com.datatorrent.stram.StreamingContainerManager.(
> StreamingContainerManager.java:418)
>
> at com.datatorrent.stram.StreamingContainerManager.getInstance(
> StreamingContainerManager.java:3023)
>
> at com.datatorrent.stram.StreamingAppMasterService.serviceInit(
> StreamingAppMasterService.java:551)
>
> at org.apache.hadoop.service.AbstractService.init(
> AbstractService.java:163)
>
> at com.datatorrent.stram.StreamingAppMaster.main(
> StreamingAppMaster.java:102)
>
> 2016-08-24 11

Re: Malhar 0.8.1.1 Kafka Operator and chroot

2016-08-25 Thread hsy...@gmail.com
Hey Alex,

Does the workaround work? I just want to follow up to see my hypothesis for
the root cause is correct. Thanks!

Regards,
Siyuan

On Wed, Aug 24, 2016 at 10:56 AM, hsy...@gmail.com  wrote:

> Hey Alex,
>
> Yeah, I think there is a bug for multitenant kafka support in the code. I
> have created a ticket
> https://issues.apache.org/jira/browse/APEXMALHAR-2199
>
> For now can you try one thing:
> Can you try to set your zookeeper to something like this:
>
> 
>
>dt.operator.kafkaInputOperator.prop.consumer.zookeeper
>
>10.##.##.#:2181/kafka2,10.##.##.##:2181/kafka2,10.##.
> ##.##:2181/kafka2,10.##.##.#:2181/kafka2
>
> 
>
>
> or you can just try to set just one of the zookeeper nodes.
>
> For kafka client it only needs to know one running node but you'll lose
> zookeeper HA
>
>
> Regards,
>
> Siyuan
>
> On Wed, Aug 24, 2016 at 10:40 AM, McCullough, Alex <
> alex.mccullo...@capitalone.com> wrote:
>
>> ONE_TO_ONE
>>
>>
>>
>>
>>
>>
>>
>> *From: *"hsy...@gmail.com" 
>> *Reply-To: *"users@apex.apache.org" 
>> *Date: *Wednesday, August 24, 2016 at 1:38 PM
>>
>> *To: *"users@apex.apache.org" 
>> *Subject: *Re: Malhar 0.8.1.1 Kafka Operator and chroot
>>
>>
>>
>> Hey Alex,
>>
>>
>>
>> Do you use ONE_TO_ONE or ONE_TO_MANY partition?
>>
>>
>>
>> Regards,
>>
>> Siyuan
>>
>>
>>
>> On Wed, Aug 24, 2016 at 10:27 AM, McCullough, Alex <
>> alex.mccullo...@capitalone.com> wrote:
>>
>> Hey Siyuan,
>>
>>
>>
>> We are using 3.4.0
>>
>>
>>
>> Thanks,
>>
>> Alex
>>
>> *From: *"hsy...@gmail.com" 
>> *Reply-To: *"users@apex.apache.org" 
>> *Date: *Wednesday, August 24, 2016 at 12:47 PM
>> *To: *"users@apex.apache.org" 
>> *Subject: *Re: Malhar 0.8.1.1 Kafka Operator and chroot
>>
>>
>>
>> Hey McCullough,
>>
>>
>>
>> What malhar version do you use?
>>
>>
>>
>> Regards,
>>
>> Siyuan
>>
>>
>>
>> On Wed, Aug 24, 2016 at 9:07 AM, McCullough, Alex <
>> alex.mccullo...@capitalone.com> wrote:
>>
>> Hey All,
>>
>>
>>
>> We are using the 0.8.1 kafka operator and the ZK connection string has a
>> chroot on it. We get errors when launching and the app fails, is there a
>> proper way in apex to append a chroot?
>>
>>
>>
>>
>>
>> **the ip’s are masked with #, but that’s not how they appear in our code
>> obviously**
>>
>>
>>
>> When we add this to the property for ZK:
>>
>>
>>
>>
>>
>> 
>>
>>dt.operator.kafkaInputOperator.prop.consumer.
>> zookeeper
>>
>>10.##.##.#:2181,10.##.##.##:2181,10.##.##.##:2181,10.
>> ##.##.#:2181/kafka2
>>
>> 
>>
>>
>>
>>
>>
>>
>>
>> We get this error (connecting to a cluster without chroot it works fine):
>>
>>
>>
>>
>>
>> 2016-08-24 11:55:13,448 [main-EventThread] INFO  zkclient.ZkClient
>> processStateChanged - zookeeper state changed (SyncConnected)
>>
>> 2016-08-24 11:55:13,585 [ZkClient-EventThread-30-10.##
>> .##.#:2181,10.##.##.##:2181,10.##.##.#:2181/kafka2,10.##.##.##:2181]
>> INFO  zkclient.ZkEventThread run - Terminate ZkClient event thread.
>>
>> 2016-08-24 11:55:13,596 [main] INFO  zookeeper.ZooKeeper close - Session:
>> 0x4558654aacf4263 closed
>>
>> 2016-08-24 11:55:13,596 [main-EventThread] INFO  zookeeper.ClientCnxn run
>> - EventThread shut down
>>
>> 2016-08-24 11:55:13,597 [main] INFO  kafka.AbstractKafkaInputOperator
>> definePartitions - [ONE_TO_ONE]: Initializing partition(s)
>>
>> 2016-08-24 11:55:13,602 [main] INFO  service.AbstractService noteFailure
>> - Service com.datatorrent.stram.StreamingAppMasterService failed in
>> state INITED; cause: java.lang.IllegalArgumentException: there has to be
>> one idempotent storage manager
>>
>> java.lang.IllegalArgumentException: there has to be one idempotent
>> storage manager
>>
>> at com.google.common.base.Preconditions.checkArgument(Precondit
>> ions.java:93)
>>
>> at org.apache.apex.malhar.lib.wal.FSWindowDataManager.partition
>> ed(FSWindowDataManager.java:251)
>>
>> at com.datatorrent.contrib.kafka.AbstractKafkaInputOperator.def
&g

Re: Malhar 0.8.1.1 Kafka Operator and chroot

2016-08-26 Thread hsy...@gmail.com
Hey Alex,

Theoretically, you can discover other zk nodes from one. But we only use
zookeeper only to discover brokers. This is a limitation in the code but I
think when we fix the bug to support full path name, we don't have to
support discover for other zk node.

So right now, if the only zk node you specify dies, the operator will not
be able to refresh the metadata, it will fail at certain point.

Regards,
Siyuan

On Thu, Aug 25, 2016 at 2:45 PM, McCullough, Alex <
alex.mccullo...@capitalone.com> wrote:

> Hey Siyuan,
>
>
>
> Looks like using the single ZK IP with the chroot works, but fails with
> the same error when adding more than one.
>
>
>
> Can explain the exact impact of only having the single zk node listed? It
> can auto discover with ZK the other nodes, but if the one we connect to
> drops than Apex won’t know any other server to connect to?
>
>
>
> Thanks,
>
> Alex
>
>
>
> *From: *"hsy...@gmail.com" 
> *Reply-To: *"users@apex.apache.org" 
> *Date: *Thursday, August 25, 2016 at 12:57 PM
>
> *To: *"users@apex.apache.org" 
> *Subject: *Re: Malhar 0.8.1.1 Kafka Operator and chroot
>
>
>
> Hey Alex,
>
>
>
> Does the workaround work? I just want to follow up to see my hypothesis
> for the root cause is correct. Thanks!
>
>
>
> Regards,
>
> Siyuan
>
>
>
> On Wed, Aug 24, 2016 at 10:56 AM, hsy...@gmail.com 
> wrote:
>
> Hey Alex,
>
>
>
> Yeah, I think there is a bug for multitenant kafka support in the code. I
> have created a ticket
>
> https://issues.apache.org/jira/browse/APEXMALHAR-2199
>
>
>
> For now can you try one thing:
>
> Can you try to set your zookeeper to something like this:
>
> 
>
>dt.operator.kafkaInputOperator.prop.consumer.zookeeper
>
>10.##.##.#:2181/kafka2,10.##.##.##:2181/kafka2,10.##.
> ##.##:2181/kafka2,10.##.##.#:2181/kafka2
>
> 
>
>
>
> or you can just try to set just one of the zookeeper nodes.
>
> For kafka client it only needs to know one running node but you'll lose
> zookeeper HA
>
>
>
> Regards,
>
> Siyuan
>
>
>
> On Wed, Aug 24, 2016 at 10:40 AM, McCullough, Alex <
> alex.mccullo...@capitalone.com> wrote:
>
> ONE_TO_ONE
>
>
>
>
>
>
>
> *From: *"hsy...@gmail.com" 
> *Reply-To: *"users@apex.apache.org" 
> *Date: *Wednesday, August 24, 2016 at 1:38 PM
>
>
> *To: *"users@apex.apache.org" 
> *Subject: *Re: Malhar 0.8.1.1 Kafka Operator and chroot
>
>
>
> Hey Alex,
>
>
>
> Do you use ONE_TO_ONE or ONE_TO_MANY partition?
>
>
>
> Regards,
>
> Siyuan
>
>
>
> On Wed, Aug 24, 2016 at 10:27 AM, McCullough, Alex <
> alex.mccullo...@capitalone.com> wrote:
>
> Hey Siyuan,
>
>
>
> We are using 3.4.0
>
>
>
> Thanks,
>
> Alex
>
> *From: *"hsy...@gmail.com" 
> *Reply-To: *"users@apex.apache.org" 
> *Date: *Wednesday, August 24, 2016 at 12:47 PM
> *To: *"users@apex.apache.org" 
> *Subject: *Re: Malhar 0.8.1.1 Kafka Operator and chroot
>
>
>
> Hey McCullough,
>
>
>
> What malhar version do you use?
>
>
>
> Regards,
>
> Siyuan
>
>
>
> On Wed, Aug 24, 2016 at 9:07 AM, McCullough, Alex <
> alex.mccullo...@capitalone.com> wrote:
>
> Hey All,
>
>
>
> We are using the 0.8.1 kafka operator and the ZK connection string has a
> chroot on it. We get errors when launching and the app fails, is there a
> proper way in apex to append a chroot?
>
>
>
>
>
> **the ip’s are masked with #, but that’s not how they appear in our code
> obviously**
>
>
>
> When we add this to the property for ZK:
>
>
>
>
>
> 
>
>dt.operator.kafkaInputOperator.prop.consumer.zookeeper
>
>10.##.##.#:2181,10.##.##.##:2181,10.##.##.##:2181,10.
> ##.##.#:2181/kafka2
>
> 
>
>
>
>
>
>
>
> We get this error (connecting to a cluster without chroot it works fine):
>
>
>
>
>
> 2016-08-24 11:55:13,448 [main-EventThread] INFO  zkclient.ZkClient
> processStateChanged - zookeeper state changed (SyncConnected)
>
> 2016-08-24 11:55:13,585 [ZkClient-EventThread-30-10.##
> .##.#:2181,10.##.##.##:2181,10.##.##.#:2181/kafka2,10.##.##.##:2181]
> INFO  zkclient.ZkEventThread run - Terminate ZkClient event thread.
>
> 2016-08-24 11:55:13,596 [main] INFO  zookeeper.ZooKeeper close - Session:
> 0x4558654aacf4263 closed
>
> 2016-08-24 11:55:13,596 [main-EventThread] INFO  zookeeper.ClientCnxn run
> - EventThread shut down
>
> 2016-08-24 11:55:13,5

Re: Emit values from an array

2016-09-30 Thread hsy...@gmail.com
Is you pojo a shared object? I think you need to create new pojo every time.

Regards,
Siyuan

On Thu, Sep 29, 2016 at 3:03 PM, Jaikit Jilka  wrote:

> Hello,
>
> I am trying to emit values from an array. I am emitting using an for loop.
> Number of records emitted is correct but it is emitting only the last value
> of the array multiple times. So how to emit different values of array.
>
> int i =0;
> for (int j = 0;j
>  pojo.setID(id);
> pojo.seturl(url);
> pojo.setPosition(i);
>  pojo.setCares(cares[j]);
> i++;
> CaresOut.emit(pojo);
> }
>
>
> Thank you,
>
> Jaikit Jilka
>


Re: Apex and Malhar Java 8 Certified

2016-10-05 Thread hsy...@gmail.com
I think the problem is what people expect when we say "certified".  To me,
If I see something is certified with java 8, I would assume that I can use
java 8 api(new features stream, lambda etc.) to write the operator code,
not only just run the code with jre 8 or compile existing code with jdk 8
and run.

I did try some operator code with java 8 stream API and some lambda
expression in some methods it works. I havn't tried any operators with new
features in their non-transient properties. And also we should take a look
to see if kryo fully works with java 8 classes/types

Regards,
Siyuan



On Wed, Oct 5, 2016 at 9:34 AM, Munagala Ramanath 
wrote:

> You can use Java 8 but the source and target compatibility configuration
> parameters in
> your pom.xml for the maven-compiler-plugin still need to be 1.7
>
> Ram
>
> On Wed, Oct 5, 2016 at 9:14 AM, Feldkamp, Brandon (CONT) <
> brandon.feldk...@capitalone.com> wrote:
>
>> So is it safe to say that JDK 1.8 is supported to the same extent that
>> 1.7 is?
>>
>>
>>
>> We’re not running into any issues currently (that I know of…feel free to
>> chime back in Alex) but we’re making design decision and were curious about
>> being able to use Java 8 features.
>>
>>
>>
>> Thanks!
>>
>> Brandon
>>
>>
>>
>> *From: *Vlad Rozov 
>> *Organization: *DataTorrent
>> *Reply-To: *"users@apex.apache.org" 
>> *Date: *Monday, October 3, 2016 at 11:43 PM
>> *To: *"users@apex.apache.org" 
>> *Subject: *Re: Apex and Malhar Java 8 Certified
>>
>>
>>
>> We do test on Java 8 - both Apex Core and Malhar Apache Jenkins builds
>> use JDK 1.8 to run tests.
>>
>> Thank you,
>>
>> Vlad
>>
>> On 10/3/16 15:45, Thomas Weise wrote:
>>
>> Apex is built against Java 7 and expected to work as is on Java 8 (Hadoop
>> distros still support 1.7 as well). Are you running into specific issues?
>>
>>
>>
>> Thanks,
>>
>> Thomas
>>
>>
>>
>> On Mon, Oct 3, 2016 at 12:06 PM, McCullough, Alex <
>> alex.mccullo...@capitalone.com> wrote:
>>
>> Hey All,
>>
>>
>>
>> I know there were talks about this at some point but is Apex and/or
>> Malhar Java 8 certified? If not, is there a current plan and date to be so?
>>
>>
>>
>> Thanks,
>>
>> Alex
>>
>>
>> --
>>
>> The information contained in this e-mail is confidential and/or
>> proprietary to Capital One and/or its affiliates and may only be used
>> solely in performance of work or services for Capital One. The information
>> transmitted herewith is intended only for use by the individual or entity
>> to which it is addressed. If the reader of this message is not the intended
>> recipient, you are hereby notified that any review, retransmission,
>> dissemination, distribution, copying or other use of, or taking of any
>> action in reliance upon this information is strictly prohibited. If you
>> have received this communication in error, please contact the sender and
>> delete the material from your computer.
>>
>>
>>
>>
>>
>>
>> --
>>
>> The information contained in this e-mail is confidential and/or
>> proprietary to Capital One and/or its affiliates and may only be used
>> solely in performance of work or services for Capital One. The information
>> transmitted herewith is intended only for use by the individual or entity
>> to which it is addressed. If the reader of this message is not the intended
>> recipient, you are hereby notified that any review, retransmission,
>> dissemination, distribution, copying or other use of, or taking of any
>> action in reliance upon this information is strictly prohibited. If you
>> have received this communication in error, please contact the sender and
>> delete the material from your computer.
>>
>
>


Re: Apex and Malhar Java 8 Certified

2016-10-05 Thread hsy...@gmail.com
Some good explanation of why 1.8 source cannot be compiled to 1.8- bytecode

http://stackoverflow.com/questions/16143684/can-java-8-code-be-compiled-to-run-on-java-7-jvm

https://zeroturnaround.com/rebellabs/java-8-explained-default-methods




On Wed, Oct 5, 2016 at 12:15 PM, Munagala Ramanath 
wrote:

> Doing that gives us this lovely message:
>
> [INFO] Compiling 2 source files to /home/ram/tests/apex/
> myapexapp/target/classes
> javacTask: source release 1.8 requires target release 1.8
>
> An interesting page discussing possible problems:
> http://www.draconianoverlord.com/2014/04/01/jdk-compatibility.html
>
> Ram
>
>
> On Wed, Oct 5, 2016 at 10:22 AM, Thomas Weise  wrote:
>
>> Source level can be 1.8, which allows you to use 1.8 features. Did you
>> keep target level at 1.7?
>>
>>
>> On Wed, Oct 5, 2016 at 10:18 AM, hsy...@gmail.com 
>> wrote:
>>
>>> I think the problem is what people expect when we say "certified".  To
>>> me, If I see something is certified with java 8, I would assume that I can
>>> use java 8 api(new features stream, lambda etc.) to write the operator
>>> code, not only just run the code with jre 8 or compile existing code with
>>> jdk 8 and run.
>>>
>>> I did try some operator code with java 8 stream API and some lambda
>>> expression in some methods it works. I havn't tried any operators with new
>>> features in their non-transient properties. And also we should take a look
>>> to see if kryo fully works with java 8 classes/types
>>>
>>> Regards,
>>> Siyuan
>>>
>>>
>>>
>>> On Wed, Oct 5, 2016 at 9:34 AM, Munagala Ramanath 
>>> wrote:
>>>
>>>> You can use Java 8 but the source and target compatibility
>>>> configuration parameters in
>>>> your pom.xml for the maven-compiler-plugin still need to be 1.7
>>>>
>>>> Ram
>>>>
>>>> On Wed, Oct 5, 2016 at 9:14 AM, Feldkamp, Brandon (CONT) <
>>>> brandon.feldk...@capitalone.com> wrote:
>>>>
>>>>> So is it safe to say that JDK 1.8 is supported to the same extent that
>>>>> 1.7 is?
>>>>>
>>>>>
>>>>>
>>>>> We’re not running into any issues currently (that I know of…feel free
>>>>> to chime back in Alex) but we’re making design decision and were curious
>>>>> about being able to use Java 8 features.
>>>>>
>>>>>
>>>>>
>>>>> Thanks!
>>>>>
>>>>> Brandon
>>>>>
>>>>>
>>>>>
>>>>> *From: *Vlad Rozov 
>>>>> *Organization: *DataTorrent
>>>>> *Reply-To: *"users@apex.apache.org" 
>>>>> *Date: *Monday, October 3, 2016 at 11:43 PM
>>>>> *To: *"users@apex.apache.org" 
>>>>> *Subject: *Re: Apex and Malhar Java 8 Certified
>>>>>
>>>>>
>>>>>
>>>>> We do test on Java 8 - both Apex Core and Malhar Apache Jenkins builds
>>>>> use JDK 1.8 to run tests.
>>>>>
>>>>> Thank you,
>>>>>
>>>>> Vlad
>>>>>
>>>>> On 10/3/16 15:45, Thomas Weise wrote:
>>>>>
>>>>> Apex is built against Java 7 and expected to work as is on Java 8
>>>>> (Hadoop distros still support 1.7 as well). Are you running into specific
>>>>> issues?
>>>>>
>>>>>
>>>>>
>>>>> Thanks,
>>>>>
>>>>> Thomas
>>>>>
>>>>>
>>>>>
>>>>> On Mon, Oct 3, 2016 at 12:06 PM, McCullough, Alex <
>>>>> alex.mccullo...@capitalone.com> wrote:
>>>>>
>>>>> Hey All,
>>>>>
>>>>>
>>>>>
>>>>> I know there were talks about this at some point but is Apex and/or
>>>>> Malhar Java 8 certified? If not, is there a current plan and date to be 
>>>>> so?
>>>>>
>>>>>
>>>>>
>>>>> Thanks,
>>>>>
>>>>> Alex
>>>>>
>>>>>
>>>>> --
>>>>>
>>>>> The information contained in this e-mail is confidential and/or
>>>>> proprietary to Capital One and/or its affiliates and may only be used
>>>>> solely in performance of work or services for Capital One. The information
>>>>> transmitted herewith is intended only for use by the individual or entity
>>>>> to which it is addressed. If the reader of this message is not the 
>>>>> intended
>>>>> recipient, you are hereby notified that any review, retransmission,
>>>>> dissemination, distribution, copying or other use of, or taking of any
>>>>> action in reliance upon this information is strictly prohibited. If you
>>>>> have received this communication in error, please contact the sender and
>>>>> delete the material from your computer.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>>
>>>>> The information contained in this e-mail is confidential and/or
>>>>> proprietary to Capital One and/or its affiliates and may only be used
>>>>> solely in performance of work or services for Capital One. The information
>>>>> transmitted herewith is intended only for use by the individual or entity
>>>>> to which it is addressed. If the reader of this message is not the 
>>>>> intended
>>>>> recipient, you are hereby notified that any review, retransmission,
>>>>> dissemination, distribution, copying or other use of, or taking of any
>>>>> action in reliance upon this information is strictly prohibited. If you
>>>>> have received this communication in error, please contact the sender and
>>>>> delete the material from your computer.
>>>>>
>>>>
>>>>
>>>
>>
>


Re: Datatorrent fault tolerance

2016-10-07 Thread hsy...@gmail.com
Hey Jaspal,

Did you add any code to existing KafkaSinglePortExactlyOnceOutputOperator from
malhar?  If so please make sure the producer you use here
is org.apache.kafka.clients.producer.KafkaProducer instead of
kafka.javaapi.producer.Producer.  That is old api and that is not supported
by MapR stream.


Regards,
Siyuan

On Fri, Oct 7, 2016 at 9:01 AM, Jaspal Singh 
wrote:

> Thomas,
>
> Below is the operator implementation we are trying to run. This operator
> is getting an object of tenant class from updtream operator.
>
> public class KafkaSinglePortExactlyOnceOutputOperator extends 
> AbstractKafkaOutputOperator {
>
> private static final Logger LOG = 
> LoggerFactory.getLogger(KafkaSinglePortExactlyOnceOutputOperator.class);
>
> public transient final DefaultInputPort in = new 
> DefaultInputPort() {
>
> Gson gson = new Gson();
>
> @Override
> public void process(Tenant tenant) {
>
> try {
> Producer producer = getKafkaProducer();
> //ObjectMapper mapper = new ObjectMapper();
> long now = System.currentTimeMillis();
> //Configuration conf = HBaseConfiguration.create();
> //TenantDao dao = new TenantDao(conf);
> //ArrayList puts = new ArrayList<>();
> if (tenant != null) {
> //Tenant tenant = tenant.next();
> if (StringUtils.isNotEmpty(tenant.getGl())) {
> producer.send(new ProducerRecord String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:certUpdate",
>  tenant.getVolumeName(), gson.toJson(tenant)));
> //puts.add(dao.mkPut(tenant));
> } else {
> producer.send(new ProducerRecord String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:error",
>  tenant.getVolumeName(), gson.toJson(tenant)));
>
> }
> producer.flush();
> }
> }
>
>
> After building the application, it throws error during launch:
>
> An error occurred trying to launch the application. Server message:
> java.lang.NoClassDefFoundError: Lkafka/javaapi/producer/Producer; at
> java.lang.Class.getDeclaredFields0(Native Method) at java.lang.Class.
> privateGetDeclaredFields(Class.java:2583) at java.lang.Class.
> getDeclaredFields(Class.java:1916) at
>
>
> Thanks
> Jaspal
>
> On Fri, Oct 7, 2016 at 10:42 AM, Jaspal Singh 
> wrote:
>
>> Thomas,
>>
>> I was trying to refer to the input from previous operator.
>>
>> Another thing when we extend the AbstractKafkaOutputOperator, do we need
>> to specify  ? Since we are getting an object of class type from
>> previous operator.
>>
>>
>> Thanks
>> Jaspal
>>
>> On Fri, Oct 7, 2016 at 10:12 AM, Thomas Weise  wrote:
>>
>>> Are you referring to the upstream operator in the DAG or the state of
>>> the previous application after relaunch? Since the data is stored in MapR
>>> streams, an operator that is a producer can also act as a consumer. Please
>>> clarify your question.
>>>
>>>
>>> On Fri, Oct 7, 2016 at 7:59 AM, Jaspal Singh >> > wrote:
>>>
 Hi Thomas,

 I have a question, so when we are using
 *KafkaSinglePortExactlyOnceOutputOperator* to write results into
 maprstream topic will it be able to read messgaes from the previous
 operator ?


 Thanks
 Jaspal

 On Thu, Oct 6, 2016 at 6:28 PM, Thomas Weise  wrote:

> For recovery you need to set the window data manager like so:
>
> https://github.com/DataTorrent/examples/blob/master/tutorial
> s/exactly-once/src/main/java/com/example/myapexapp/Applicati
> on.java#L33
>
> That will also apply to stateful restart of the entire application
> (relaunch from previous instance's checkpointed state).
>
> For cold restart, you would need to consider the property you mention
> and decide what is applicable to your use case.
>
> Thomas
>
>
> On Thu, Oct 6, 2016 at 4:16 PM, Jaspal Singh <
> jaspal.singh1...@gmail.com> wrote:
>
>> Ok now I get it. Thanks for the nice explaination !!
>>
>> One more thing, so you mentioned about checkpointing the offset
>> ranges to replay in same order from kafka.
>>
>> Is there any property we need to configure to do that? like
>> initialOffset set to APPLICATION_OR_LATEST.
>>
>>
>> Thanks
>> Jaspal
>>
>>
>> On Thursday, October 6, 2016, Thomas Weise 
>> wrote:
>>
>>> What you want is the effect of exactly-once output (that's why we
>>> call it also end-to-end exactly-once). There is no such thing as
>>> exactly-once processing in a distributed system. In this case it would 
>>> be
>>> rather "produce exactly-once. Upstream operators, on failure, will 
>>> recover
>>> to checkpointed state and re-process the stream from there. Thi

Re: Datatorrent fault tolerance

2016-10-07 Thread hsy...@gmail.com
Also which kafka output operator you are using?
Please use org.apache.apex.malhar.kafka.AbstractOutputOperator instead
of com.datatorrent.contrib.kafka.AbstractOutputOperator.
Only the org.apache.apex.malhar.kafka.AbstractOutputOperator works with
MapR stream, the latter only works with kafka 0.8.* or 0.9

Regards,
Siyuan

On Fri, Oct 7, 2016 at 10:38 AM, hsy...@gmail.com  wrote:

> Hey Jaspal,
>
> Did you add any code to existing KafkaSinglePortExactlyOnceOutputOperator from
> malhar?  If so please make sure the producer you use here
> is org.apache.kafka.clients.producer.KafkaProducer instead of
> kafka.javaapi.producer.Producer.  That is old api and that is not
> supported by MapR stream.
>
>
> Regards,
> Siyuan
>
> On Fri, Oct 7, 2016 at 9:01 AM, Jaspal Singh 
> wrote:
>
>> Thomas,
>>
>> Below is the operator implementation we are trying to run. This operator
>> is getting an object of tenant class from updtream operator.
>>
>> public class KafkaSinglePortExactlyOnceOutputOperator extends 
>> AbstractKafkaOutputOperator {
>>
>> private static final Logger LOG = 
>> LoggerFactory.getLogger(KafkaSinglePortExactlyOnceOutputOperator.class);
>>
>> public transient final DefaultInputPort in = new 
>> DefaultInputPort() {
>>
>> Gson gson = new Gson();
>>
>> @Override
>> public void process(Tenant tenant) {
>>
>> try {
>> Producer producer = getKafkaProducer();
>> //ObjectMapper mapper = new ObjectMapper();
>> long now = System.currentTimeMillis();
>> //Configuration conf = HBaseConfiguration.create();
>> //TenantDao dao = new TenantDao(conf);
>> //ArrayList puts = new ArrayList<>();
>> if (tenant != null) {
>> //Tenant tenant = tenant.next();
>> if (StringUtils.isNotEmpty(tenant.getGl())) {
>> producer.send(new ProducerRecord> String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:certUpdate",
>>  tenant.getVolumeName(), gson.toJson(tenant)));
>> //puts.add(dao.mkPut(tenant));
>> } else {
>> producer.send(new ProducerRecord> String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:error",
>>  tenant.getVolumeName(), gson.toJson(tenant)));
>>
>> }
>> producer.flush();
>> }
>> }
>>
>>
>> After building the application, it throws error during launch:
>>
>> An error occurred trying to launch the application. Server message:
>> java.lang.NoClassDefFoundError: Lkafka/javaapi/producer/Producer; at
>> java.lang.Class.getDeclaredFields0(Native Method) at
>> java.lang.Class.privateGetDeclaredFields(Class.java:2583) at
>> java.lang.Class.getDeclaredFields(Class.java:1916) at
>>
>>
>> Thanks
>> Jaspal
>>
>> On Fri, Oct 7, 2016 at 10:42 AM, Jaspal Singh > > wrote:
>>
>>> Thomas,
>>>
>>> I was trying to refer to the input from previous operator.
>>>
>>> Another thing when we extend the AbstractKafkaOutputOperator, do we need
>>> to specify  ? Since we are getting an object of class type from
>>> previous operator.
>>>
>>>
>>> Thanks
>>> Jaspal
>>>
>>> On Fri, Oct 7, 2016 at 10:12 AM, Thomas Weise  wrote:
>>>
>>>> Are you referring to the upstream operator in the DAG or the state of
>>>> the previous application after relaunch? Since the data is stored in MapR
>>>> streams, an operator that is a producer can also act as a consumer. Please
>>>> clarify your question.
>>>>
>>>>
>>>> On Fri, Oct 7, 2016 at 7:59 AM, Jaspal Singh <
>>>> jaspal.singh1...@gmail.com> wrote:
>>>>
>>>>> Hi Thomas,
>>>>>
>>>>> I have a question, so when we are using
>>>>> *KafkaSinglePortExactlyOnceOutputOperator* to write results into
>>>>> maprstream topic will it be able to read messgaes from the previous
>>>>> operator ?
>>>>>
>>>>>
>>>>> Thanks
>>>>> Jaspal
>>>>>
>>>>> On Thu, Oct 6, 2016 at 6:28 PM, Thomas Weise  wrote:
>>>>>
>>>>>> For recovery you need to set the window data manager lik

Re: Datatorrent fault tolerance

2016-10-07 Thread hsy...@gmail.com
Jaspal,

Topic is a mandatory property you have to set. In mapr, the value should be
set to the full stream path example:  /your/stream/path:streamname

Regards,
Siyuan

On Fri, Oct 7, 2016 at 11:22 AM, Jaspal Singh 
wrote:

> After making the change, we are getting the below error while application
> launch:
>
> *An error occurred trying to launch the application. Server message:
> javax.validation.ConstraintViolationException: Operator kafkaOut violates
> constraints
> [ConstraintViolationImpl{rootBean=com.example.datatorrent.KafkaSinglePortExactlyOnceOutputOperator@4726f93f,
> propertyPath='topic', message='may not be null', *
>
>
>
> Thanks!!
>
> On Fri, Oct 7, 2016 at 1:13 PM, Jaspal Singh 
> wrote:
>
>> So I just changes the malhar-kafka version to 3.5.0, I was able to import
>> the AbstractOutputOperator. Let me try to launch it now.
>>
>> Thanks for your inputs !!
>>
>>
>>
>> On Fri, Oct 7, 2016 at 1:09 PM, Jaspal Singh 
>> wrote:
>>
>>> Should we use malhar-library version 3.5 then ?
>>>
>>>
>>> Thanks!!
>>>
>>> On Fri, Oct 7, 2016 at 1:04 PM, Thomas Weise  wrote:
>>>
>>>> Please make sure to depend on version 3.5 of malhar-kafka in pom.xml.
>>>> This operator is not in malhar-library, it's a separate module.
>>>>
>>>>
>>>> On Fri, Oct 7, 2016 at 11:01 AM, Jaspal Singh <
>>>> jaspal.singh1...@gmail.com> wrote:
>>>>
>>>>> Hi Siyuan,
>>>>>
>>>>> I am using the same Kafka producer as you mentioned. But I am not
>>>>> seeing the AbstractKafkaOutputOperator in malhar library while import.
>>>>>
>>>>>
>>>>> Thanks!!
>>>>>
>>>>> On Fri, Oct 7, 2016 at 12:52 PM, hsy...@gmail.com 
>>>>> wrote:
>>>>>
>>>>>> Also which kafka output operator you are using?
>>>>>> Please use org.apache.apex.malhar.kafka.AbstractOutputOperator
>>>>>> instead of com.datatorrent.contrib.kafka.AbstractOutputOperator.
>>>>>> Only the org.apache.apex.malhar.kafka.AbstractOutputOperator works
>>>>>> with MapR stream, the latter only works with kafka 0.8.* or 0.9
>>>>>>
>>>>>> Regards,
>>>>>> Siyuan
>>>>>>
>>>>>> On Fri, Oct 7, 2016 at 10:38 AM, hsy...@gmail.com 
>>>>>> wrote:
>>>>>>
>>>>>>> Hey Jaspal,
>>>>>>>
>>>>>>> Did you add any code to existing 
>>>>>>> KafkaSinglePortExactlyOnceOutputOperator
>>>>>>> from malhar?  If so please make sure the producer you use here is
>>>>>>> org.apache.kafka.clients.producer.KafkaProducer instead of
>>>>>>> kafka.javaapi.producer.Producer.  That is old api and that is not
>>>>>>> supported by MapR stream.
>>>>>>>
>>>>>>>
>>>>>>> Regards,
>>>>>>> Siyuan
>>>>>>>
>>>>>>> On Fri, Oct 7, 2016 at 9:01 AM, Jaspal Singh <
>>>>>>> jaspal.singh1...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Thomas,
>>>>>>>>
>>>>>>>> Below is the operator implementation we are trying to run. This
>>>>>>>> operator is getting an object of tenant class from updtream operator.
>>>>>>>>
>>>>>>>> public class KafkaSinglePortExactlyOnceOutputOperator extends 
>>>>>>>> AbstractKafkaOutputOperator {
>>>>>>>>
>>>>>>>> private static final Logger LOG = 
>>>>>>>> LoggerFactory.getLogger(KafkaSinglePortExactlyOnceOutputOperator.class);
>>>>>>>>
>>>>>>>> public transient final DefaultInputPort in = new 
>>>>>>>> DefaultInputPort() {
>>>>>>>>
>>>>>>>> Gson gson = new Gson();
>>>>>>>>
>>>>>>>> @Override
>>>>>>>> public void process(Tenant tenant) {
>>>>>>>>
>>>>>>>> try {
>>>>>>>> Producer producer = getKafkaProducer();
>>>>>>>> //ObjectMapper mappe

Re: Datatorrent fault tolerance

2016-10-07 Thread hsy...@gmail.com
Jaspal,

I think you miss the kafkaOut  :)

Regards,
Siyuan

On Fri, Oct 7, 2016 at 11:32 AM, Jaspal Singh 
wrote:

> Siyuan,
>
> That's how we have given it in properties file:
>
> [image: Inline image 1]
>
>
> Thanks!!
>
> On Fri, Oct 7, 2016 at 1:27 PM, hsy...@gmail.com  wrote:
>
>> Jaspal,
>>
>> Topic is a mandatory property you have to set. In mapr, the value should
>> be set to the full stream path example:  /your/stream/path:streamname
>>
>> Regards,
>> Siyuan
>>
>> On Fri, Oct 7, 2016 at 11:22 AM, Jaspal Singh > > wrote:
>>
>>> After making the change, we are getting the below error while
>>> application launch:
>>>
>>> *An error occurred trying to launch the application. Server message:
>>> javax.validation.ConstraintViolationException: Operator kafkaOut violates
>>> constraints
>>> [ConstraintViolationImpl{rootBean=com.example.datatorrent.KafkaSinglePortExactlyOnceOutputOperator@4726f93f,
>>> propertyPath='topic', message='may not be null', *
>>>
>>>
>>>
>>> Thanks!!
>>>
>>> On Fri, Oct 7, 2016 at 1:13 PM, Jaspal Singh >> > wrote:
>>>
>>>> So I just changes the malhar-kafka version to 3.5.0, I was able to
>>>> import the AbstractOutputOperator. Let me try to launch it now.
>>>>
>>>> Thanks for your inputs !!
>>>>
>>>>
>>>>
>>>> On Fri, Oct 7, 2016 at 1:09 PM, Jaspal Singh <
>>>> jaspal.singh1...@gmail.com> wrote:
>>>>
>>>>> Should we use malhar-library version 3.5 then ?
>>>>>
>>>>>
>>>>> Thanks!!
>>>>>
>>>>> On Fri, Oct 7, 2016 at 1:04 PM, Thomas Weise  wrote:
>>>>>
>>>>>> Please make sure to depend on version 3.5 of malhar-kafka in pom.xml.
>>>>>> This operator is not in malhar-library, it's a separate module.
>>>>>>
>>>>>>
>>>>>> On Fri, Oct 7, 2016 at 11:01 AM, Jaspal Singh <
>>>>>> jaspal.singh1...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi Siyuan,
>>>>>>>
>>>>>>> I am using the same Kafka producer as you mentioned. But I am not
>>>>>>> seeing the AbstractKafkaOutputOperator in malhar library while import.
>>>>>>>
>>>>>>>
>>>>>>> Thanks!!
>>>>>>>
>>>>>>> On Fri, Oct 7, 2016 at 12:52 PM, hsy...@gmail.com 
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Also which kafka output operator you are using?
>>>>>>>> Please use org.apache.apex.malhar.kafka.AbstractOutputOperator
>>>>>>>> instead of com.datatorrent.contrib.kafka.AbstractOutputOperator.
>>>>>>>> Only the org.apache.apex.malhar.kafka.AbstractOutputOperator works
>>>>>>>> with MapR stream, the latter only works with kafka 0.8.* or 0.9
>>>>>>>>
>>>>>>>> Regards,
>>>>>>>> Siyuan
>>>>>>>>
>>>>>>>> On Fri, Oct 7, 2016 at 10:38 AM, hsy...@gmail.com >>>>>>> > wrote:
>>>>>>>>
>>>>>>>>> Hey Jaspal,
>>>>>>>>>
>>>>>>>>> Did you add any code to existing 
>>>>>>>>> KafkaSinglePortExactlyOnceOutputOperator
>>>>>>>>> from malhar?  If so please make sure the producer you use here is
>>>>>>>>> org.apache.kafka.clients.producer.KafkaProducer instead of
>>>>>>>>> kafka.javaapi.producer.Producer.  That is old api and that is not
>>>>>>>>> supported by MapR stream.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Regards,
>>>>>>>>> Siyuan
>>>>>>>>>
>>>>>>>>> On Fri, Oct 7, 2016 at 9:01 AM, Jaspal Singh <
>>>>>>>>> jaspal.singh1...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Thomas,
>>>>>>>>>>
>>>>>>>>>> Below is the operator implementation we are trying to run. This
>>>>>>>>>> operator is getting an object of te

Re: Datatorrent fault tolerance

2016-10-07 Thread hsy...@gmail.com
Oh I see, you want to send to different topics. Well, then you have to give
some dummy value to the topic property on the operator.

Regards,
Siyuan

On Fri, Oct 7, 2016 at 11:38 AM, Jaspal Singh 
wrote:

> Siyuan,
>
> So for the output operator, we have specified it as a part of our logic
> itself.
>
> public class KafkaSinglePortExactlyOnceOutputOperator extends 
> AbstractKafkaOutputOperator {
>
> private static final Logger LOG = 
> LoggerFactory.getLogger(KafkaSinglePortExactlyOnceOutputOperator.class);
>
> public transient final DefaultInputPort in = new 
> DefaultInputPort() {
>
> Gson gson = new Gson();
>
> @Override
> public void process(Tenant tenant) {
>
> try {
> Producer producer = getKafkaProducer();
> //ObjectMapper mapper = new ObjectMapper();
> long now = System.currentTimeMillis();
> //Configuration conf = HBaseConfiguration.create();
> //TenantDao dao = new TenantDao(conf);
> //ArrayList puts = new ArrayList<>();
> if (tenant != null) {
> //Tenant tenant = tenant.next();
> if (StringUtils.isNotEmpty(tenant.getGl())) {
> producer.send(new ProducerRecord String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:certUpdate",
>  tenant.getVolumeName(), gson.toJson(tenant)));
> //puts.add(dao.mkPut(tenant));
> } else {
> producer.send(new ProducerRecord String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:error",
>  tenant.getVolumeName(), gson.toJson(tenant)));
>
>     }
> producer.flush();
> }
> }
>
>
>
> Thanks!!
>
> On Fri, Oct 7, 2016 at 1:34 PM, hsy...@gmail.com  wrote:
>
>> Jaspal,
>>
>> I think you miss the kafkaOut  :)
>>
>> Regards,
>> Siyuan
>>
>> On Fri, Oct 7, 2016 at 11:32 AM, Jaspal Singh > > wrote:
>>
>>> Siyuan,
>>>
>>> That's how we have given it in properties file:
>>>
>>> [image: Inline image 1]
>>>
>>>
>>> Thanks!!
>>>
>>> On Fri, Oct 7, 2016 at 1:27 PM, hsy...@gmail.com 
>>> wrote:
>>>
>>>> Jaspal,
>>>>
>>>> Topic is a mandatory property you have to set. In mapr, the value
>>>> should be set to the full stream path example:  
>>>> /your/stream/path:streamname
>>>>
>>>> Regards,
>>>> Siyuan
>>>>
>>>> On Fri, Oct 7, 2016 at 11:22 AM, Jaspal Singh <
>>>> jaspal.singh1...@gmail.com> wrote:
>>>>
>>>>> After making the change, we are getting the below error while
>>>>> application launch:
>>>>>
>>>>> *An error occurred trying to launch the application. Server message:
>>>>> javax.validation.ConstraintViolationException: Operator kafkaOut violates
>>>>> constraints
>>>>> [ConstraintViolationImpl{rootBean=com.example.datatorrent.KafkaSinglePortExactlyOnceOutputOperator@4726f93f,
>>>>> propertyPath='topic', message='may not be null', *
>>>>>
>>>>>
>>>>>
>>>>> Thanks!!
>>>>>
>>>>> On Fri, Oct 7, 2016 at 1:13 PM, Jaspal Singh <
>>>>> jaspal.singh1...@gmail.com> wrote:
>>>>>
>>>>>> So I just changes the malhar-kafka version to 3.5.0, I was able to
>>>>>> import the AbstractOutputOperator. Let me try to launch it now.
>>>>>>
>>>>>> Thanks for your inputs !!
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Fri, Oct 7, 2016 at 1:09 PM, Jaspal Singh <
>>>>>> jaspal.singh1...@gmail.com> wrote:
>>>>>>
>>>>>>> Should we use malhar-library version 3.5 then ?
>>>>>>>
>>>>>>>
>>>>>>> Thanks!!
>>>>>>>
>>>>>>> On Fri, Oct 7, 2016 at 1:04 PM, Thomas Weise  wrote:
>>>>>>>
>>>>>>>> Please make sure to depend on version 3.5 of malhar-kafka in
>>>>>>>> pom.xml. This operator is not in malhar-library, it's a separate 
>>>>>>>> module.
>>>>>>>>
>>

Re: [EXTERNAL] kafka

2016-11-01 Thread hsy...@gmail.com
Hey Raja,

The setup for secure kafka input operator is not easy. You can follow these
steps.
1. Follow kafka document to setup your brokers properly (
https://kafka.apache.org/090/documentation.html#security_overview)
2. You have to manually create the client JAAS file (
https://kafka.apache.org/090/documentation.html#security_overview)
   a sample file would look like this:
 KafkaClient {

com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="/etc/security/keytabs/kafka_client.keytab"
principal="kafka-clien...@example.com";
};

3. On the operator you have to set the attribute JVM_OPTS
 -Djava.security.auth.login.config=/etc/kafka/kafka_client_jaas.conf

4. On the operator you have to set the consumerProperties, for example

set dt.operator.$your_operator_name.{consumerProps.security.protocol}  to
 SASL or other security function you use
set dt.operator.$your_operator_name.{consumerPrope.
sasl.kerberos.service.name} to kafka


Hope this would help you!


Regards,
Siyuan



On Sun, Oct 30, 2016 at 10:56 PM, Raja.Aravapalli <
raja.aravapa...@target.com> wrote:

>
>
> Hi Team,
>
>
>
> Can someone pls help me with below requested information ?
>
>
>
> Does apache apex have any inbuilt kafka input operator to read from Kafka
> 0.9 secure kafka topics?
>
>
>
> Thanks a lot.
>
>
>
> Regards,
>
> Raja.
>
>
>
> *From: *"Raja.Aravapalli" 
> *Reply-To: *"users@apex.apache.org" 
> *Date: *Monday, October 24, 2016 at 2:29 PM
> *To: *"users@apex.apache.org" 
> *Subject: *[EXTERNAL] kafka
>
>
>
>
>
> Hi,
>
>
>
> Do we have any kaka operator readily available to consume messages from
> secure kafka topics in kafka 0.9 clusters?
>
>
>
> Thanks a lot.
>
>
>
>
>
> Regards,
>
> Raja.
>