Re: [EXTERNAL] Re: hdfs file write operator is increasing the latency - resulting entire DAG to fail

2017-07-13 Thread Raja . Aravapalli

Some more information on the application:


Kafka input operator ==> Deseriazation of avro ==> Enrich the message with some 
text  ==> Unifier (auto-generated) ==> write to hdfs

Kafka input operator > running in 
10 instances >with setting ONE_TO_MANY
Deseriazation of avro> (running in 
10 instances with parallel parition)
Enrich the message with some text   > (running in 10 instances 
with parallel parition)
Unifier   > 
running in SINGLE instance -  accumulating all the messages from 10 partitions 
--- receiving approx. 1000 msgs per sec --- running with mem setting to 20gb
write to hdfs> 
running in SINGLE instance collecting all the messages from Unifier --- 
receiving approx. 1000 msgs per sec --- running with mem setting to 20gb

Please advice.


Regards,
Raja.

From: Pramod Immaneni 
Reply-To: "users@apex.apache.org" 
Date: Thursday, July 13, 2017 at 11:27 AM
To: "users@apex.apache.org" 
Subject: Re: [EXTERNAL] Re: hdfs file write operator is increasing the latency 
- resulting entire DAG to fail

If the data can be written to different files then you can have multiple 
partitions, with different partitions writing to a disjointed set of files. You 
cannot have two partitions to writing to the same file.

As the file output operator has the ability for the implementation to supply a 
filename for every tuple, you could provide different filenames in the 
different partitions. To group data belonging to the same file to go to the 
same partition, you may need to specify a stream codec. Please see 
https://ci.apache.org/projects/apex-core/apex-core-javadoc-release-3.6/com/datatorrent/api/StreamCodec.html

To specify the number of partitions, for example as 4, you can use the 
following attribute


dt.operator.HDFS_operator.attr.PARTITIONER
com.datatorrent.common.partitioner.StatelessPartitioner:4


Second, the rate you mentioned 2mb/s isn't too high for a single partition so I 
am wondering if there is something else going on to increase latencies. In your 
implementation of the operator, are you doing any buffering or any heavy 
processing?

Thanks

On Thu, Jul 13, 2017 at 9:07 AM, Raja.Aravapalli 
> wrote:

Thanks for the response Pramod.


-  My hdfs operator is running in single partition. With the input of 
approx. 1000 msgs per sec. – I am not sure how to partition this operator ☹

-  I am not really sure on how to check the bytes/sec. But, I hope It 
will be huge, because my msg size in kafka is approx. 2kb.   ===> input 1000 
msgs per sec * 2kb == approx.. 2mb per sec [Rough calculation]

-  And for your info, right now, using the below property I have the 
set the memory for this operator to 20Gb. Which I feel is very huge.

dt.operator.HDFS_operator.attr.MEMORY_MB
20480



Please advice.


Thanks a lot.

Raja.

From: Pramod Immaneni >
Reply-To: "users@apex.apache.org" 
>
Date: Thursday, July 13, 2017 at 10:31 AM
To: "users@apex.apache.org" 
>
Subject: [EXTERNAL] Re: hdfs file write operator is increasing the latency - 
resulting entire DAG to fail

Hi Raja,

How many partitions do you have for the file output operator and what would you 
save your data write rate is in bytes/second.

Thanks

On Thu, Jul 13, 2017 at 8:13 AM, Raja.Aravapalli 
> wrote:
Team,

We have an apex application that is reading from Kafka and wring to HDFS.

The  data flow for kafka topic is very huge… say 2500 messages per sec!!

The issue we are facing is:

The operator (which extends AbstractFileOutputOperator) is writing to hdfs is 
building latency over time and failing eventually. Can someone pls share your 
thoughts on how I can handle this ?


Thanks a lot.


Regards,
Raja.




Re: [EXTERNAL] Re: hdfs file write operator is increasing the latency - resulting entire DAG to fail

2017-07-13 Thread Raja . Aravapalli

Thanks for the response Pramod.


-  My hdfs operator is running in single partition. With the input of 
approx. 1000 msgs per sec. – I am not sure how to partition this operator ☹

-  I am not really sure on how to check the bytes/sec. But, I hope It 
will be huge, because my msg size in kafka is approx. 2kb.   ===> input 1000 
msgs per sec * 2kb == approx.. 2mb per sec [Rough calculation]

-  And for your info, right now, using the below property I have the 
set the memory for this operator to 20Gb. Which I feel is very huge.

dt.operator.HDFS_operator.attr.MEMORY_MB
20480



Please advice.


Thanks a lot.

Raja.

From: Pramod Immaneni 
Reply-To: "users@apex.apache.org" 
Date: Thursday, July 13, 2017 at 10:31 AM
To: "users@apex.apache.org" 
Subject: [EXTERNAL] Re: hdfs file write operator is increasing the latency - 
resulting entire DAG to fail

Hi Raja,

How many partitions do you have for the file output operator and what would you 
save your data write rate is in bytes/second.

Thanks

On Thu, Jul 13, 2017 at 8:13 AM, Raja.Aravapalli 
> wrote:
Team,

We have an apex application that is reading from Kafka and wring to HDFS.

The  data flow for kafka topic is very huge… say 2500 messages per sec!!

The issue we are facing is:

The operator (which extends AbstractFileOutputOperator) is writing to hdfs is 
building latency over time and failing eventually. Can someone pls share your 
thoughts on how I can handle this ?


Thanks a lot.


Regards,
Raja.



Re: [EXTERNAL] Re: How to Change memory setting for a unifier?

2017-06-20 Thread Raja . Aravapalli

Thanks Sandesh. That helps.


Regards,
Raja.

From: Sandesh Hegde 
Reply-To: "users@apex.apache.org" 
Date: Tuesday, June 20, 2017 at 6:00 PM
To: "users@apex.apache.org" 
Subject: Re: [EXTERNAL] Re: How to Change memory setting for a unifier?

To find (almost)everything, source is the answer
https://github.com/apache/apex-core/blob/master/api/src/main/java/com/datatorrent/api/Context.java

On Tue, Jun 20, 2017 at 2:13 PM Raja.Aravapalli 
> wrote:
Thanks a lot for the response.

Do we any web link that gives details about all the possibile configurations 
and properties ?



Thanks
Raja.


From: Pramod Immaneni >
Reply-To: "users@apex.apache.org" 
>
Date: Tuesday, June 20, 2017 at 3:22 PM

To: "users@apex.apache.org" 
>
Subject: [EXTERNAL] Re: How to Change memory setting for a unifier?

Hi Raja,

To set the memory for the unifier you would use the port to identify the 
unifier, so something like

dt.operator.oper1.port.port1.unifier.MEMORY_MB

or use the setUnifierAttribute in DAG if setting it from java.

Thanks

On Tue, Jun 20, 2017 at 1:13 PM, Raja.Aravapalli 
> wrote:

Hi,

Can someone please help me with configuration property on how to increase 
memory setting for Unifier?

I am using the below way, to increase the memory for Application Master and 
opertors of a DAG:


dt.attr.MASTER_MEMORY_MB
10240



dt.operator.*.attr.MEMORY_MB
10240



UseCase/Scenario:

I am having an apex application that reads from Kafka in 3 partitions with the 
setting “ONE_TO_MANY”, and sends the messages to following operators. But, the 
problem is, even though the messages are being read thru multiple parititons, 
but I see a unifier operation happening after the messages read in multiple 
kafka partitions. Which is becoming a bottle nect with the number of increase 
in the number of messages being floated.

I have configured the memory all the operators to 10GB, with above given 
setting, but, the unifier which has been cropped up, was taking only 2GB by 
default, and failing with heap space problems.

Hence I wish to increase the memory for the unifier.

Thanks a lot in advance.


Regards,
Raja.






Re: [EXTERNAL] Re: How to Change memory setting for a unifier?

2017-06-20 Thread Raja . Aravapalli
Thanks a lot for the response.

Do we any web link that gives details about all the possibile configurations 
and properties ?



Thanks
Raja.


From: Pramod Immaneni 
Reply-To: "users@apex.apache.org" 
Date: Tuesday, June 20, 2017 at 3:22 PM
To: "users@apex.apache.org" 
Subject: [EXTERNAL] Re: How to Change memory setting for a unifier?

Hi Raja,

To set the memory for the unifier you would use the port to identify the 
unifier, so something like

dt.operator.oper1.port.port1.unifier.MEMORY_MB

or use the setUnifierAttribute in DAG if setting it from java.

Thanks

On Tue, Jun 20, 2017 at 1:13 PM, Raja.Aravapalli 
> wrote:

Hi,

Can someone please help me with configuration property on how to increase 
memory setting for Unifier?

I am using the below way, to increase the memory for Application Master and 
opertors of a DAG:


dt.attr.MASTER_MEMORY_MB
10240



dt.operator.*.attr.MEMORY_MB
10240



UseCase/Scenario:

I am having an apex application that reads from Kafka in 3 partitions with the 
setting “ONE_TO_MANY”, and sends the messages to following operators. But, the 
problem is, even though the messages are being read thru multiple parititons, 
but I see a unifier operation happening after the messages read in multiple 
kafka partitions. Which is becoming a bottle nect with the number of increase 
in the number of messages being floated.

I have configured the memory all the operators to 10GB, with above given 
setting, but, the unifier which has been cropped up, was taking only 2GB by 
default, and failing with heap space problems.

Hence I wish to increase the memory for the unifier.

Thanks a lot in advance.


Regards,
Raja.






Re: [EXTERNAL] Re: File output operator - HDFS Write failing

2017-03-24 Thread Raja . Aravapalli

Hi Pramod,

Below are the versions I am using for Apex core & Malhar.

3.4.0
3.6.0


code of File Output operator:

import com.datatorrent.lib.io.fs.AbstractFileOutputOperator;


public class WriteToHdfs extends AbstractFileOutputOperator{

String HDFS_FILENAME = "hdfs_data";

public WriteToHdfs() {}

public WriteToHdfs(String fileName) {
HDFS_FILENAME = fileName;
}

@Override
protected String getFileName(String s) {

return HDFS_FILENAME;
}

@Override
protected byte[] getBytesForTuple(String s) {
return (s + "\n").getBytes();
}

}


Regards,
Raja.

From: Pramod Immaneni 
Reply-To: "users@apex.apache.org" 
Date: Thursday, March 23, 2017 at 10:08 PM
To: "users@apex.apache.org" 
Subject: [EXTERNAL] Re: File output operator - HDFS Write failing

Hi Raja,

What version of malhar are you using? Are you extending the 
AbstractFileOutputOperator or are you use a stock implementation from Malhar?

Thanks

On Thu, Mar 23, 2017 at 4:57 AM, Raja.Aravapalli 
> wrote:

Hi Team,

I have a file output operator, that is writing data into hdfs files, which is 
working fine for a day, after that started to fail with exception below:

java.lang.RuntimeException: 
org.apache.hadoop.ipc.RemoteException(java.lang.ArrayIndexOutOfBoundsException):
 java.lang.ArrayIndexOutOfBoundsException
at 
com.datatorrent.lib.io.fs.AbstractFileOutputOperator.setup(AbstractFileOutputOperator.java:428)
at 
com.datatorrent.lib.io.fs.AbstractFileOutputOperator.setup(AbstractFileOutputOperator.java:112)
at com.datatorrent.stram.engine.Node.setup(Node.java:187)
at 
com.datatorrent.stram.engine.StreamingContainer.setupNode(StreamingContainer.java:1309)
at 
com.datatorrent.stram.engine.StreamingContainer.access$100(StreamingContainer.java:130)
at 
com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1388)
Caused by: 
org.apache.hadoop.ipc.RemoteException(java.lang.ArrayIndexOutOfBoundsException):
 java.lang.ArrayIndexOutOfBoundsException
at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1552)
at org.apache.hadoop.ipc.Client.call(Client.java:1496)
at org.apache.hadoop.ipc.Client.call(Client.java:1396)
at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:233)
at com.sun.proxy.$Proxy10.getBlockLocations(Unknown Source)
at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:270)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:278)
at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:194)
at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:176)
at com.sun.proxy.$Proxy11.getBlockLocations(Unknown Source)
at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1236)
at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1223)
at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1211)
at 
org.apache.hadoop.hdfs.DFSInputStream.fetchLocatedBlocksAndGetLastBlockLength(DFSInputStream.java:309)
at org.apache.hadoop.hdfs.DFSInputStream.openInfo(DFSInputStream.java:274)
at org.apache.hadoop.hdfs.DFSInputStream.(DFSInputStream.java:266)
at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1536)
at 
org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:330)
at 
org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:326)
at 
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at 
org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:326)
at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:782)
at 
com.datatorrent.lib.io.fs.AbstractFileOutputOperator.recoverFile(AbstractFileOutputOperator.java:455)
at 
com.datatorrent.lib.io.fs.AbstractFileOutputOperator.setup(AbstractFileOutputOperator.java:373)
... 5 more
2017-03-23 06:53:47,942 INFO engine.StreamingContainer 
(StreamingContainer.java:processHeartbeatResponse(799)) - Undeploy request: [6]
2017-03-23 06:53:47,944 INFO engine.StreamingContainer 
(StreamingContainer.java:undeploy(561)) - Undeploy complete.



Can someone pls help me fix this. It is breaking our production job!

Thanks a lot.



-  Raja.



File output operator - HDFS Write failing

2017-03-23 Thread Raja . Aravapalli

Hi Team,

I have a file output operator, that is writing data into hdfs files, which is 
working fine for a day, after that started to fail with exception below:

java.lang.RuntimeException: 
org.apache.hadoop.ipc.RemoteException(java.lang.ArrayIndexOutOfBoundsException):
 java.lang.ArrayIndexOutOfBoundsException
at 
com.datatorrent.lib.io.fs.AbstractFileOutputOperator.setup(AbstractFileOutputOperator.java:428)
at 
com.datatorrent.lib.io.fs.AbstractFileOutputOperator.setup(AbstractFileOutputOperator.java:112)
at com.datatorrent.stram.engine.Node.setup(Node.java:187)
at 
com.datatorrent.stram.engine.StreamingContainer.setupNode(StreamingContainer.java:1309)
at 
com.datatorrent.stram.engine.StreamingContainer.access$100(StreamingContainer.java:130)
at 
com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1388)
Caused by: 
org.apache.hadoop.ipc.RemoteException(java.lang.ArrayIndexOutOfBoundsException):
 java.lang.ArrayIndexOutOfBoundsException
at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1552)
at org.apache.hadoop.ipc.Client.call(Client.java:1496)
at org.apache.hadoop.ipc.Client.call(Client.java:1396)
at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:233)
at com.sun.proxy.$Proxy10.getBlockLocations(Unknown Source)
at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:270)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:278)
at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:194)
at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:176)
at com.sun.proxy.$Proxy11.getBlockLocations(Unknown Source)
at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1236)
at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1223)
at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1211)
at 
org.apache.hadoop.hdfs.DFSInputStream.fetchLocatedBlocksAndGetLastBlockLength(DFSInputStream.java:309)
at org.apache.hadoop.hdfs.DFSInputStream.openInfo(DFSInputStream.java:274)
at org.apache.hadoop.hdfs.DFSInputStream.(DFSInputStream.java:266)
at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1536)
at 
org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:330)
at 
org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:326)
at 
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at 
org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:326)
at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:782)
at 
com.datatorrent.lib.io.fs.AbstractFileOutputOperator.recoverFile(AbstractFileOutputOperator.java:455)
at 
com.datatorrent.lib.io.fs.AbstractFileOutputOperator.setup(AbstractFileOutputOperator.java:373)
... 5 more
2017-03-23 06:53:47,942 INFO engine.StreamingContainer 
(StreamingContainer.java:processHeartbeatResponse(799)) - Undeploy request: [6]
2017-03-23 06:53:47,944 INFO engine.StreamingContainer 
(StreamingContainer.java:undeploy(561)) - Undeploy complete.



Can someone pls help me fix this. It is breaking our production job!

Thanks a lot.



-  Raja.


Re: [EXTERNAL] Re: KafkaSinglePortInputOperator

2016-12-07 Thread Raja . Aravapalli

Thanks a tonnn for the support today Chaitanya!!

We were able to successfully download messages from Kafka SSL Secured Topics!!

Thanks you very much!!


Regards,
Raja.

From: Chaitanya Chebolu 
Reply-To: "users@apex.apache.org" 
Date: Wednesday, December 7, 2016 at 11:28 AM
To: "users@apex.apache.org" 
Subject: Re: [EXTERNAL] Re: KafkaSinglePortInputOperator

Raja,

  Issue is the SSL properties(ssl.*.*) are not reflected to Kafka consumer.
  Could you please share the complete project ?

Thanks,
Chaitanya


On Wed, Dec 7, 2016 at 7:39 AM, Raja.Aravapalli 
> wrote:
Hi Chaitanya,

Any other thoughts on how I can fix this ??

Are Apex doesn’t yet support SSL secured topics ?


Thanks a lot.

Regards,
Raja.

From: "Raja.Aravapalli" 
>
Reply-To: "users@apex.apache.org" 
>
Date: Tuesday, December 6, 2016 at 5:32 PM

To: "users@apex.apache.org" 
>
Subject: Re: [EXTERNAL] Re: KafkaSinglePortInputOperator



I added the below line as said…  I cannot see any exceptions also

Still nothing is happening ☹

I am not sure, why these below as always showing as null… even though I set 
them in my Application.java class!! Any help on how to set these properties ???

ssl.keystore.location = null
ssl.truststore.location = null
ssl.keystore.password = null


Thanks a lot in advance.

Regards,
Raja.

From: Chaitanya Chebolu 
>
Reply-To: "users@apex.apache.org" 
>
Date: Tuesday, December 6, 2016 at 5:17 PM
To: "users@apex.apache.org" 
>
Subject: Re: [EXTERNAL] Re: KafkaSinglePortInputOperator

Raja,

   Please set the consumerProps to the KafkaSinglePortInputOperator.
   Add the below line in your application:
  KafkaSinglePortInputOperator in = dag.addOperator("in", new 
KafkaSinglePortInputOperator());
  --
   in.setConsumerProps(props);

 Please let me know, if you are still facing issues.

Regards,
Chaitanya



On Tue, Dec 6, 2016 at 5:00 PM, Raja.Aravapalli 
> wrote:

Find below the log I am observing:

2016-12-06 05:17:37,264 INFO  kafka.AbstractKafkaInputOperator 
(AbstractKafkaInputOperator.java:initPartitioner(311)) - Initialize Partitioner
2016-12-06 05:17:37,265 INFO  kafka.AbstractKafkaInputOperator 
(AbstractKafkaInputOperator.java:initPartitioner(324)) - Actual Partitioner is 
class org.apache.apex.malhar.kafka.OneToOnePartitioner
2016-12-06 05:17:37,280 INFO  consumer.ConsumerConfig 
(AbstractConfig.java:logAll(165)) - ConsumerConfig values:
metric.reporters = []
metadata.max.age.ms = 30
value.deserializer = class 
org.apache.kafka.common.serialization.ByteArrayDeserializer
group.id = 
org.apache.apex.malhar.kafka.AbstractKafkaInputOperatorMETA_GROUP
partition.assignment.strategy = 
[org.apache.kafka.clients.consumer.RangeAssignor]
reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
max.partition.fetch.bytes = 1048576
bootstrap.servers = 
[10.66.137.116:9093]
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.service.name = 
null
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.keystore.type = JKS
ssl.trustmanager.algorithm = PKIX
enable.auto.commit = false
ssl.key.password = null
fetch.max.wait.ms = 500
sasl.kerberos.min.time.before.relogin = 6
connections.max.idle.ms = 54
ssl.truststore.password = null
session.timeout.ms = 3
metrics.num.samples = 2
client.id =
ssl.endpoint.identification.algorithm = null
key.deserializer = class 
org.apache.kafka.common.serialization.ByteArrayDeserializer
ssl.protocol = TLS
check.crcs = true
request.timeout.ms = 4
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
  

Re: [EXTERNAL] Re: KafkaSinglePortInputOperator

2016-12-06 Thread Raja . Aravapalli


I added the below line as said…  I cannot see any exceptions also

Still nothing is happening ☹

I am not sure, why these below as always showing as null… even though I set 
them in my Application.java class!! Any help on how to set these properties ???

ssl.keystore.location = null
ssl.truststore.location = null
ssl.keystore.password = null


Thanks a lot in advance.

Regards,
Raja.

From: Chaitanya Chebolu 
Reply-To: "users@apex.apache.org" 
Date: Tuesday, December 6, 2016 at 5:17 PM
To: "users@apex.apache.org" 
Subject: Re: [EXTERNAL] Re: KafkaSinglePortInputOperator

Raja,

   Please set the consumerProps to the KafkaSinglePortInputOperator.
   Add the below line in your application:
  KafkaSinglePortInputOperator in = dag.addOperator("in", new 
KafkaSinglePortInputOperator());
  --
   in.setConsumerProps(props);

 Please let me know, if you are still facing issues.

Regards,
Chaitanya



On Tue, Dec 6, 2016 at 5:00 PM, Raja.Aravapalli 
> wrote:

Find below the log I am observing:

2016-12-06 05:17:37,264 INFO  kafka.AbstractKafkaInputOperator 
(AbstractKafkaInputOperator.java:initPartitioner(311)) - Initialize Partitioner
2016-12-06 05:17:37,265 INFO  kafka.AbstractKafkaInputOperator 
(AbstractKafkaInputOperator.java:initPartitioner(324)) - Actual Partitioner is 
class org.apache.apex.malhar.kafka.OneToOnePartitioner
2016-12-06 05:17:37,280 INFO  consumer.ConsumerConfig 
(AbstractConfig.java:logAll(165)) - ConsumerConfig values:
metric.reporters = []
metadata.max.age.ms = 30
value.deserializer = class 
org.apache.kafka.common.serialization.ByteArrayDeserializer
group.id = 
org.apache.apex.malhar.kafka.AbstractKafkaInputOperatorMETA_GROUP
partition.assignment.strategy = 
[org.apache.kafka.clients.consumer.RangeAssignor]
reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
max.partition.fetch.bytes = 1048576
bootstrap.servers = 
[10.66.137.116:9093]
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.service.name = 
null
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.keystore.type = JKS
ssl.trustmanager.algorithm = PKIX
enable.auto.commit = false
ssl.key.password = null
fetch.max.wait.ms = 500
sasl.kerberos.min.time.before.relogin = 6
connections.max.idle.ms = 54
ssl.truststore.password = null
session.timeout.ms = 3
metrics.num.samples = 2
client.id =
ssl.endpoint.identification.algorithm = null
key.deserializer = class 
org.apache.kafka.common.serialization.ByteArrayDeserializer
ssl.protocol = TLS
check.crcs = true
request.timeout.ms = 4
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.keystore.location = null
heartbeat.interval.ms = 3000
auto.commit.interval.ms = 5000
receive.buffer.bytes = 32768
ssl.cipher.suites = null
ssl.truststore.type = JKS
security.protocol = PLAINTEXT
ssl.truststore.location = null
ssl.keystore.password = null
ssl.keymanager.algorithm = SunX509
metrics.sample.window.ms = 
3
fetch.min.bytes = 1024
send.buffer.bytes = 131072
auto.offset.reset = latest

2016-12-06 05:17:37,385 INFO  utils.AppInfoParser 
(AppInfoParser.java:(82)) - Kafka version : 0.9.0.0
2016-12-06 05:17:37,385 INFO  utils.AppInfoParser 
(AppInfoParser.java:(83)) - Kafka commitId : fc7243c2af4b2b4a



Regards,
Raja.

From: Chaitanya Chebolu 
>
Reply-To: "users@apex.apache.org" 
>
Date: Tuesday, December 6, 2016 at 4:28 PM
To: "users@apex.apache.org" 
>
Subject: [EXTERNAL] Re: KafkaSinglePortInputOperator

Hi Raja,

  Could you please share the Application Master logs and 

Re: [EXTERNAL] Re: KafkaSinglePortInputOperator

2016-12-06 Thread Raja . Aravapalli

Find below the log I am observing:

2016-12-06 05:17:37,264 INFO  kafka.AbstractKafkaInputOperator 
(AbstractKafkaInputOperator.java:initPartitioner(311)) - Initialize Partitioner
2016-12-06 05:17:37,265 INFO  kafka.AbstractKafkaInputOperator 
(AbstractKafkaInputOperator.java:initPartitioner(324)) - Actual Partitioner is 
class org.apache.apex.malhar.kafka.OneToOnePartitioner
2016-12-06 05:17:37,280 INFO  consumer.ConsumerConfig 
(AbstractConfig.java:logAll(165)) - ConsumerConfig values:
metric.reporters = []
metadata.max.age.ms = 30
value.deserializer = class 
org.apache.kafka.common.serialization.ByteArrayDeserializer
group.id = 
org.apache.apex.malhar.kafka.AbstractKafkaInputOperatorMETA_GROUP
partition.assignment.strategy = 
[org.apache.kafka.clients.consumer.RangeAssignor]
reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
max.partition.fetch.bytes = 1048576
bootstrap.servers = [10.66.137.116:9093]
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.keystore.type = JKS
ssl.trustmanager.algorithm = PKIX
enable.auto.commit = false
ssl.key.password = null
fetch.max.wait.ms = 500
sasl.kerberos.min.time.before.relogin = 6
connections.max.idle.ms = 54
ssl.truststore.password = null
session.timeout.ms = 3
metrics.num.samples = 2
client.id =
ssl.endpoint.identification.algorithm = null
key.deserializer = class 
org.apache.kafka.common.serialization.ByteArrayDeserializer
ssl.protocol = TLS
check.crcs = true
request.timeout.ms = 4
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.keystore.location = null
heartbeat.interval.ms = 3000
auto.commit.interval.ms = 5000
receive.buffer.bytes = 32768
ssl.cipher.suites = null
ssl.truststore.type = JKS
security.protocol = PLAINTEXT
ssl.truststore.location = null
ssl.keystore.password = null
ssl.keymanager.algorithm = SunX509
metrics.sample.window.ms = 3
fetch.min.bytes = 1024
send.buffer.bytes = 131072
auto.offset.reset = latest

2016-12-06 05:17:37,385 INFO  utils.AppInfoParser 
(AppInfoParser.java:(82)) - Kafka version : 0.9.0.0
2016-12-06 05:17:37,385 INFO  utils.AppInfoParser 
(AppInfoParser.java:(83)) - Kafka commitId : fc7243c2af4b2b4a



Regards,
Raja.

From: Chaitanya Chebolu 
Reply-To: "users@apex.apache.org" 
Date: Tuesday, December 6, 2016 at 4:28 PM
To: "users@apex.apache.org" 
Subject: [EXTERNAL] Re: KafkaSinglePortInputOperator

Hi Raja,

  Could you please share the Application Master logs and Kafka operator 
container logs.

Regards,
Chaitanya

On Tue, Dec 6, 2016 at 4:17 PM, Raja.Aravapalli 
> wrote:

Hi Team,

I am using “KafkaSinglePortInputOperator” to connect to a SSL Secured topic in 
Kafka 0.9!!

Unfortunately… my apex application is not going to “RUNNING” state…!! Only 
staying in ACCEPTED State and then going into FAILED statie!! I don’t see much 
information in the logs…!! ☹

Can someone please help fix the issue…. We have immediate need to read messages 
from kafka 0.9 SSL configured topics…

Please advise!


Thanks very much in advance.


Regards,
Raja.



Re: [EXTERNAL] Re: KafkaSinglePortInputOperator

2016-12-06 Thread Raja . Aravapalli

Thanks a log for the response Chaitanya!!

Sharing more details for your reference and suggestions !!

Appliation.java:

import com.datatorrent.api.DAG;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.contrib.kafka.KafkaSinglePortByteArrayInputOperator;
import com.datatorrent.contrib.kafka.KafkaSinglePortStringInputOperator;
import org.apache.apex.malhar.kafka.AbstractKafkaInputOperator;
import org.apache.apex.malhar.kafka.KafkaSinglePortInputOperator;
import org.apache.hadoop.conf.Configuration;

import java.util.Properties;


public class Application implements StreamingApplication {

public void populateDAG(DAG dag, Configuration conf)
{


Properties props = new Properties();

props.put("security.protocol", "SSL");
props.put("ssl.truststore.location","client.truststore.jks");
props.put("ssl.truststore.password","**");
props.put("ssl.keystore.location","server.keystore.jks");
props.put("ssl.keystore.password","**");
props.put("schema.registry.url", "http://:8081;);
props.put("enable.auto.commit", "false");

KafkaSinglePortInputOperator in = dag.addOperator("in", new 
KafkaSinglePortInputOperator());
in.setInitialPartitionCount(1);
in.setTopics("***");

in.setInitialOffset(AbstractKafkaInputOperator.InitialOffset.EARLIEST.name());
in.setClusters(":9093");

LineOutputOperator out = dag.addOperator("out", new 
LineOutputOperator());
out.setFilePath("hdfs://**/*");
out.setFileName("test");
out.setMaxLength(1024);

dag.addStream("data", in.outputPort, out.input);

}


}


Also, sharing POM.xml below:


http://maven.apache.org/POM/4.0.0;
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
4.0.0

com.tgt.hdp.mighty
test-apex
1.0-SNAPSHOT


3.4.0
lib/*.jar
2.7.1.2.3.4.0-3485
1.1.2.2.3.4.0-3485
0.9.0.1
0.9.0.1-cp1
2.0.1
1.7.7
1.1
2.9.1
0.38
4.10




HDPReleases
HDP Releases

http://repo.hortonworks.com/content/repositories/releases/
default


HDP Jetty Hadoop
HDP Jetty Hadoop

http://repo.hortonworks.com/content/repositories/jetty-hadoop/
default


confluent
http://packages.confluent.io/maven





org.apache.apex
malhar-library
${apex.version}




org.apache.apex
apex-common
${apex.version}
provided


junit
junit
${junit.version}
test


org.apache.apex
apex-engine
${apex.version}
test



org.apache.apex
malhar-contrib
${apex.version}



   org.apache.apex
malhar-kafka
${apex.version}



org.apache.avro
avro
${avro.version}



org.apache.kafka
kafka_2.11
${confluent.kafka.version}



io.confluent
kafka-avro-serializer
${kafka.avro.srlzr.version}


log4j
log4j


org.slf4j
slf4j-log4j12





com.googlecode.json-simple
json-simple
${json.version}



joda-time
joda-time
${jodatime.version}



de.javakaffee
kryo-serializers
${kyroserializer.version}







org.apache.maven.plugins
maven-eclipse-plugin
2.9

true



maven-compiler-plugin
3.3

UTF-8
1.7
1.7
true
false
true
true



maven-dependency-plugin
2.8


copy-dependencies
prepare-package

copy-dependencies



KafkaSinglePortInputOperator

2016-12-06 Thread Raja . Aravapalli

Hi Team,

I am using “KafkaSinglePortInputOperator” to connect to a SSL Secured topic in 
Kafka 0.9!!

Unfortunately… my apex application is not going to “RUNNING” state…!! Only 
staying in ACCEPTED State and then going into FAILED statie!! I don’t see much 
information in the logs…!! ☹

Can someone please help fix the issue…. We have immediate need to read messages 
from kafka 0.9 SSL configured topics…

Please advise!


Thanks very much in advance.


Regards,
Raja.


Re: [EXTERNAL] kafka

2016-11-02 Thread Raja . Aravapalli

Thanks a lot for the response Pramod.

Our kafka environment is NOT Kerberized! But the kafka topic is secure via SSL 
certificates!

Only hosts with valid SSL certificates can only connect! Any thoughts in this 
regard will be really helpful!!

Thanks a lot!!


Regards,
Raja.

From: Pramod Immaneni 
Reply-To: "users@apex.apache.org" 
Date: Tuesday, November 1, 2016 at 3:24 AM
To: "users@apex.apache.org" 
Subject: Re: [EXTERNAL] kafka

Hi Raja,

Yes we have had success reading from kerberized secure Kafka. If I remember 
correctly we had to set a few attributes like container jvm options in Apex. Is 
your setup Kerberos secured Kafka?

Thanks

On Oct 30, 2016, at 10:56 PM, Raja.Aravapalli 
> wrote:

Hi Team,

Can someone pls help me with below requested information ?

Does apache apex have any inbuilt kafka input operator to read from Kafka 0.9 
secure kafka topics?

Thanks a lot.

Regards,
Raja.

From: "Raja.Aravapalli" 
>
Reply-To: "users@apex.apache.org" 
>
Date: Monday, October 24, 2016 at 2:29 PM
To: "users@apex.apache.org" 
>
Subject: [EXTERNAL] kafka


Hi,

Do we have any kaka operator readily available to consume messages from secure 
kafka topics in kafka 0.9 clusters?

Thanks a lot.


Regards,
Raja.


kafka

2016-10-24 Thread Raja . Aravapalli

Hi,

Do we have any kaka operator readily available to consume messages from secure 
kafka topics in kafka 0.9 clusters?

Thanks a lot.


Regards,
Raja.


Re: HDFS Write File Operator struggling to start

2016-08-05 Thread Raja . Aravapalli

Thanks for the response Ashwin!!

Once I moved the hdfs file that is reported as corrupted in the log files to a 
different location on hdfs, application was able to launch a new container 
successfully and process went fine. I still need to check the first container 
of the operator, what caused the file to actually corrupt…!!


Having a single hdfs file to collect approx 15gbs of data without any rotation 
set would cause any issues ?? Also, is it a okay/best practice to not set any 
rotation on file ?



Thanks for the response again!


Regards,
Raja.

From: Ashwin Chandra Putta 
>
Reply-To: "users@apex.apache.org" 
>
Date: Friday, August 5, 2016 at 5:45 PM
To: "users@apex.apache.org" 
>
Subject: Re: HDFS Write File Operator struggling to start

Can you check in the logs the first time the issue occurred for what triggered 
it? Look for the first container that failed.

Regards,
Ashwin

On Fri, Aug 5, 2016 at 12:58 PM, Raja.Aravapalli 
> wrote:

Hi Ashwin,

It happens when the application is running!!


Regards,
Raja.

From: Ashwin Chandra Putta 
>
Reply-To: "users@apex.apache.org" 
>
Date: Thursday, August 4, 2016 at 4:42 PM
To: "users@apex.apache.org" 
>
Subject: Re: HDFS Write File Operator struggling to start

Raja,

When do you see this issue? Does it happen while the application is running? 
Does this happen while restarting a failed application? Or does this happen 
while starting a new application?

Regards,
Ashwin.

On Thu, Aug 4, 2016 at 11:25 AM, Samba Surabhi 
> wrote:
If it is the issue with file size, you can rotate the output file.

writer.setAlwaysWriteToTmp(true);

writer.setRotationWindows(240);

Thanks,

Samba Surabhi.


From: raja.aravapa...@target.com
To: users@apex.apache.org
Subject: HDFS Write File Operator struggling to start
Date: Thu, 4 Aug 2016 14:49:16 +



Hi

I have a HDFS file write operator in my DAG, which is failing to start a new 
operator and keep on trying to start one!!

It created approx. 800 temporary files in the destination hdfs directory!! How 
can I fix this issue…. And debug the root cause…. ?

All I can see in container log is, File corrupted message…!!


Can someone please help me fix this ?


Regards,
Raja.



--

Regards,
Ashwin.



--

Regards,
Ashwin.


Re: HDFS Write File Operator struggling to start

2016-08-05 Thread Raja . Aravapalli

Hi Ashwin,

It happens when the application is running!!


Regards,
Raja.

From: Ashwin Chandra Putta 
>
Reply-To: "users@apex.apache.org" 
>
Date: Thursday, August 4, 2016 at 4:42 PM
To: "users@apex.apache.org" 
>
Subject: Re: HDFS Write File Operator struggling to start

Raja,

When do you see this issue? Does it happen while the application is running? 
Does this happen while restarting a failed application? Or does this happen 
while starting a new application?

Regards,
Ashwin.

On Thu, Aug 4, 2016 at 11:25 AM, Samba Surabhi 
> wrote:
If it is the issue with file size, you can rotate the output file.

writer.setAlwaysWriteToTmp(true);

writer.setRotationWindows(240);

Thanks,

Samba Surabhi.


From: raja.aravapa...@target.com
To: users@apex.apache.org
Subject: HDFS Write File Operator struggling to start
Date: Thu, 4 Aug 2016 14:49:16 +



Hi

I have a HDFS file write operator in my DAG, which is failing to start a new 
operator and keep on trying to start one!!

It created approx. 800 temporary files in the destination hdfs directory!! How 
can I fix this issue…. And debug the root cause…. ?

All I can see in container log is, File corrupted message…!!


Can someone please help me fix this ?


Regards,
Raja.



--

Regards,
Ashwin.


HDFS Write File Operator struggling to start

2016-08-04 Thread Raja . Aravapalli

Hi

I have a HDFS file write operator in my DAG, which is failing to start a new 
operator and keep on trying to start one!!

It created approx. 800 temporary files in the destination hdfs directory!! How 
can I fix this issue…. And debug the root cause…. ?

All I can see in container log is, File corrupted message…!!


Can someone please help me fix this ?


Regards,
Raja.


Re: supported file formats today ?

2016-08-02 Thread Raja . Aravapalli

Hi

Can someone help me with my question in below email … ?

I want to write data as and when arrives into Hadoop HDFS but want to write 
into different hadoop supported file formats like RC Files/ ORC Files ?

Do we some operator in Apex in this regard  ?


Thanks a lot in advance.


Regards,
Raja.

From: "Raja.Aravapalli" 
>
Reply-To: "users@apex.apache.org" 
>
Date: Thursday, July 28, 2016 at 2:12 PM
To: "users@apex.apache.org" 
>
Subject: supported file formats today ?


Hi

Curious to know if Apex have any operators in Malhar, that supports writing 
Sequence files/ RC / ORC / Avro file format files into HDFS other than text 
files ?

Thanks.

Regards,
Raja.


supported file formats today ?

2016-07-28 Thread Raja . Aravapalli

Hi

Curious to know if Apex have any operators in Malhar, that supports writing 
Sequence files/ RC / ORC / Avro file format files into HDFS other than text 
files ?

Thanks.

Regards,
Raja.


Re: DAG failing with "Invalid AMRMToken from app attempt_*" in log

2016-07-26 Thread Raja . Aravapalli

Thanks for the response Ram.

We are on Hadoop 2.7.1 !

Still I am facing the issue !!

Regards,
Raja.

From: Munagala Ramanath >
Reply-To: "users@apex.apache.org" 
>
Date: Tuesday, July 26, 2016 at 4:26 PM
To: "users@apex.apache.org" 
>
Subject: Re: DAG failing with "Invalid AMRMToken from app attempt_*" in log

Which version of Hadoop ? If it is 2.6.X, upgrading to 2.7.X might fix the 
problem, e.g.
https://marc.info/?l=hadoop-user=142501302419779=2

Ram

On Tue, Jul 26, 2016 at 1:59 PM, Raja.Aravapalli 
> wrote:

Hi,

My DAG fails every week atleast once with the below information in the log.


Invalid AMRMToken from app attempt_***


Can someone please share your thoughts on why I am seeing this and how I can 
fix it ?

Thanks a lot in advance.



Regards,
Raja.



DAG failing with "Invalid AMRMToken from app attempt_*" in log

2016-07-26 Thread Raja . Aravapalli

Hi,

My DAG fails every week atleast once with the below information in the log.


Invalid AMRMToken from app attempt_***


Can someone please share your thoughts on why I am seeing this and how I can 
fix it ?

Thanks a lot in advance.



Regards,
Raja.


Restart of a failed application with "-originalAppId"

2016-07-23 Thread Raja . Aravapalli

Hi,

When I tried to restart a failed application with “-originalAppId” parameter, 
it took a very long time like approx. 45mins to actually launch application 
again.

Is this an expected behaviour ? Or can someone share your thoughts on if I am 
doing something wrong ??



Regards,
Raja.


Re: hdfs output file operator

2016-07-23 Thread Raja . Aravapalli

Thanks for the response Chinmay.

Yes, this issue is during restart of a failed application.

As per my observation, my dag/application failed during Resource Manager 
failover!! And during failover something went wrong to one of the files writing 
to hdfs!!! Application tried many a times to restore the file by launching the 
operator which is writing to hdfs on many containers and failed!!

When I restarted the application, the application again tried many times to 
restore the hdfs file… and still launched many containers to recover….. App 
took really a very long time say 4 – 5hrs to successfully launch those hdfs 
operators and resume!!!


Regards,
Raja.

From: Chinmay Kolhatkar 
>
Reply-To: "users@apex.apache.org" 
>
Date: Saturday, July 23, 2016 at 12:11 AM
To: "users@apex.apache.org" 
>
Subject: Re: hdfs output file operator

Hi Raja,

I can see such a log message in AbstractFileOutputOperator at line 455.

As this code is called from setup of the operator, the operator is getting 
deployed and then failing while restoring existing file because of mismatch in 
length of the file and the offset the operator has stored previously.

From the code it looks like it takes care of such cases and restores the file.

From what I understand either the file got changes by some other way or the 
offset management has a problem.

Are you restarting the application from previous application Id?

To narrow down the problem, can you please try to change the destination path 
and see if that works?

Thanks,
Chinmay.



On Sat, Jul 23, 2016 at 5:00 AM, Sandesh Hegde 
> wrote:
Please check,
 1. AppMaster logs
 2. Cluster resources

On Fri, Jul 22, 2016 at 1:14 PM Raja.Aravapalli 
> wrote:

Hi,

I have File output operator which writes to hdfs files!!

Application is trying to deploy the operator which writes to hdfs files in many 
different containers for a long time… but is not succeeding!!! Status is 
showing as PENDING_DEPLOY

In the logs of the container which the Application is trying to deploy hdfs 
write operator, I can only see, path corrupted!!


Can someone please guide or suggest me on this ?



Regards,
Raja.



hdfs output file operator

2016-07-22 Thread Raja . Aravapalli

Hi,

I have File output operator which writes to hdfs files!!

Application is trying to deploy the operator which writes to hdfs files in many 
different containers for a long time… but is not succeeding!!! Status is 
showing as PENDING_DEPLOY

In the logs of the container which the Application is trying to deploy hdfs 
write operator, I can only see, path corrupted!!


Can someone please guide or suggest me on this ?



Regards,
Raja.


delegation token not found in cache

2016-07-21 Thread Raja . Aravapalli

Hi,

Can someone please help me why my DAG is failing with below exception after 8 
days running succesfully!!



Caused by: 
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken):
 token (HDFS_DELEGATION_TOKEN token 00 for ) can't be found in cache
at org.apache.hadoop.ipc.Client.call(Client.java:1427)
at org.apache.hadoop.ipc.Client.call(Client.java:1358)




Thanks a lot.

Regards,
Raja.


Re: DAG is failing due to memory issues

2016-07-12 Thread Raja . Aravapalli

Sure Sandesh Thanks.

Also, one quick question,

When will the size/memory of the Application Master grows ?

Does the memory of AM depends on the no.of operators in the pipeline ?

One issue I observed with my DAG is,

Memory of the application master is growing for my DAG and after reaching max. 
memory allowed, it is killed/failed… and after trying max allowed attempts 
entire DAG is failing!!

Wish to know why the size of my AM is growing and control it, so that… 
Application master doesn’t fail and eventually entire DAG doesn’t fail!


Regards,
Raja.

From: Sandesh Hegde >
Reply-To: "users@apex.apache.org" 
>
Date: Tuesday, July 12, 2016 at 2:43 PM
To: "users@apex.apache.org" 
>
Subject: Re: DAG is failing due to memory issues

UI Memory = Total Memory - AppMaster Memory

DAG size can vary between different setups, that happens because the max size 
of the container is defined by the yarn parameter mentioned above.

Apex does the following:

if (csr.container.getRequiredMemoryMB() > maxMem) {
  LOG.warn("Container memory {}m above max threshold of cluster. Using max 
value {}m.", csr.container.getRequiredMemoryMB(), maxMem);
  csr.container.setRequiredMemoryMB(maxMem);
}

On Tue, Jul 12, 2016 at 10:21 AM Raja.Aravapalli 
> wrote:

Hi,


What memory does the “allocated mem.” refers to on UI for a DAG ? Application 
Master OR Containers memory of an operators ?


[cid:B61FE0C9-4767-4FF8-9E23-454CB502C53C]


I included below properties as well and re-triggered the DAG, still it is 
showing 32GB only!!



dt.application..attr.MASTER_MEMORY_MB
4096



dt.application..operator.*.attr.MEMORY_MB
4096



I have the same DAG running on other hadoop environment, which is showing 
approx. 125gb, but in other environment only 32gb, which is what I am assuming 
to be the problem !!


Regards,
Raja.


From: Sandesh Hegde >
Reply-To: "users@apex.apache.org" 
>
Date: Tuesday, July 12, 2016 at 11:35 AM

To: "users@apex.apache.org" 
>
Subject: Re: DAG is failing due to memory issues

Raja,

Please increase the container size and launch the app again.  
yarn.scheduler.maximum-allocation-mb is for the container and not for the DAG 
and the error message showed by you is for the container.

Here is one quick way, use the following attribute.


  dt.operator.*.attr.MEMORY_MB
  4096



On Tue, Jul 12, 2016 at 9:24 AM Raja.Aravapalli 
> wrote:

Hi Ram,

Sorry I did not share that details of 32gb with you.

I am saying 32gb is allocated because, I observed the same on UI, when the 
application is running. But now, as the DAG is failed, I cannot take a 
screenshot and send!!


Regards,
Raja.

From: Munagala Ramanath >
Reply-To: "users@apex.apache.org" 
>
Date: Tuesday, July 12, 2016 at 11:06 AM

To: "users@apex.apache.org" 
>
Subject: Re: DAG is failing due to memory issues

How do you know it is allocating 32GB ? The diagnostic message you posted does 
not show
that.

Ram

On Tue, Jul 12, 2016 at 8:51 AM, Raja.Aravapalli 
> wrote:

Thanks for the response Sandesh.

Since our yarn-site is configured with value 32768 for the property 
yarn.scheduler.maximum-allocation-mb, it is allocating a max of 32gb and not 
more than that!!


Wish to know, is there a way I can increase the max allowed value ? OR, since 
it is configured in yarn-site.xml, I cannot increase it ?



Regards,
Raja.

From: Sandesh Hegde >
Reply-To: "users@apex.apache.org" 
>
Date: Tuesday, July 12, 2016 at 10:46 AM
To: "users@apex.apache.org" 
>
Subject: Re: DAG is failing due to memory issues

Quoting from the doc shared by the Ram, those parameters control operator 
memory size.


 actual container memory allocated by RM has to lie between

[yarn.scheduler.minimum-allocation-mb, yarn.scheduler.maximum-allocation-mb]

On Tue, Jul 12, 2016 at 8:38 AM Raja.Aravapalli 
> wrote:

Hi Ram,

I see in the cluster yarn-site.xml, below two properties are 

Re: DAG is failing due to memory issues

2016-07-12 Thread Raja . Aravapalli

Hi,


What memory does the “allocated mem.” refers to on UI for a DAG ? Application 
Master OR Containers memory of an operators ?


[cid:B61FE0C9-4767-4FF8-9E23-454CB502C53C]


I included below properties as well and re-triggered the DAG, still it is 
showing 32GB only!!



dt.application..attr.MASTER_MEMORY_MB
4096



dt.application..operator.*.attr.MEMORY_MB
4096



I have the same DAG running on other hadoop environment, which is showing 
approx. 125gb, but in other environment only 32gb, which is what I am assuming 
to be the problem !!


Regards,
Raja.


From: Sandesh Hegde >
Reply-To: "users@apex.apache.org" 
>
Date: Tuesday, July 12, 2016 at 11:35 AM
To: "users@apex.apache.org" 
>
Subject: Re: DAG is failing due to memory issues

Raja,

Please increase the container size and launch the app again.  
yarn.scheduler.maximum-allocation-mb is for the container and not for the DAG 
and the error message showed by you is for the container.

Here is one quick way, use the following attribute.


  dt.operator.*.attr.MEMORY_MB
  4096



On Tue, Jul 12, 2016 at 9:24 AM Raja.Aravapalli 
> wrote:

Hi Ram,

Sorry I did not share that details of 32gb with you.

I am saying 32gb is allocated because, I observed the same on UI, when the 
application is running. But now, as the DAG is failed, I cannot take a 
screenshot and send!!


Regards,
Raja.

From: Munagala Ramanath >
Reply-To: "users@apex.apache.org" 
>
Date: Tuesday, July 12, 2016 at 11:06 AM

To: "users@apex.apache.org" 
>
Subject: Re: DAG is failing due to memory issues

How do you know it is allocating 32GB ? The diagnostic message you posted does 
not show
that.

Ram

On Tue, Jul 12, 2016 at 8:51 AM, Raja.Aravapalli 
> wrote:

Thanks for the response Sandesh.

Since our yarn-site is configured with value 32768 for the property 
yarn.scheduler.maximum-allocation-mb, it is allocating a max of 32gb and not 
more than that!!


Wish to know, is there a way I can increase the max allowed value ? OR, since 
it is configured in yarn-site.xml, I cannot increase it ?



Regards,
Raja.

From: Sandesh Hegde >
Reply-To: "users@apex.apache.org" 
>
Date: Tuesday, July 12, 2016 at 10:46 AM
To: "users@apex.apache.org" 
>
Subject: Re: DAG is failing due to memory issues

Quoting from the doc shared by the Ram, those parameters control operator 
memory size.


 actual container memory allocated by RM has to lie between

[yarn.scheduler.minimum-allocation-mb, yarn.scheduler.maximum-allocation-mb]

On Tue, Jul 12, 2016 at 8:38 AM Raja.Aravapalli 
> wrote:

Hi Ram,

I see in the cluster yarn-site.xml, below two properties are configured with 
below settings..

yarn.scheduler.minimum-allocation-mb ===> 1024
yarn.scheduler.maximum-allocation-mb ===> 32768


So with the above settings at cluster level, I can’t increase the memory 
allocated for my DAG ?  Is there is any other way, I can increase the memory ?


Thanks a lot.


Regards,
Raja.

From: Munagala Ramanath >
Reply-To: "users@apex.apache.org" 
>
Date: Tuesday, July 12, 2016 at 9:31 AM
To: "users@apex.apache.org" 
>
Subject: Re: DAG is failing due to memory issues

Please see: http://docs.datatorrent.com/troubleshooting/#configuring-memory

Ram

On Tue, Jul 12, 2016 at 6:57 AM, Raja.Aravapalli 
> wrote:

Hi,

My DAG is failing with memory issues for container. Seeing below information in 
the log.



Diagnostics: Container [pid=xxx,containerID=container_xyclksdjf] is running 
beyond physical memory limits. Current usage: 1.0 GB of 1 GB physical memory 
used; 2.9 GB of 2.1 GB virtual memory used. Killing container.


Can someone help me on how I can fix this issue. Thanks a lot.



Regards,
Raja.




DAG is failing due to memory issues

2016-07-12 Thread Raja . Aravapalli

Hi,

My DAG is failing with memory issues for container. Seeing below information in 
the log.



Diagnostics: Container [pid=xxx,containerID=container_xyclksdjf] is running 
beyond physical memory limits. Current usage: 1.0 GB of 1 GB physical memory 
used; 2.9 GB of 2.1 GB virtual memory used. Killing container.


Can someone help me on how I can fix this issue. Thanks a lot.



Regards,
Raja.


Re: hashing

2016-07-07 Thread Raja . Aravapalli


Also, to share some info on the parititoner I am using:

I am using Stateless Partitioner with below code:



dt.operator.MyOperator.attr.PARTITIONER
com.datatorrent.common.partitioner.StatelessPartitioner:3


Thanks.

Regards,
Raja.

From: "Raja.Aravapalli" 
>
Date: Thursday, July 7, 2016 at 3:20 PM
To: "users@apex.apache.org" 
>
Subject: hashing


Hi,

I have an operator, which is running in 3 instances I.e partions… And, I want 
all the records with same key, here my key is "String" type, to be transferred 
to same instance/partition.

But, I am unable to achieve this with my below codec.



import com.datatorrent.lib.codec.KryoSerializableStreamCodec;

public class MyCodec extends KryoSerializableStreamCodec {

@Override
public int getPartition(String tuple) {

String[] toSplit = tuple.split("\\^");
String exId = toSplit[1];

return exId.hashCode();

}

}


Any guidance please…


Thanks a lot.


Regards,
Raja.


hashing

2016-07-07 Thread Raja . Aravapalli

Hi,

I have an operator, which is running in 3 instances I.e partions… And, I want 
all the records with same key, here my key is "String" type, to be transferred 
to same instance/partition.

But, I am unable to achieve this with my below codec.



import com.datatorrent.lib.codec.KryoSerializableStreamCodec;

public class MyCodec extends KryoSerializableStreamCodec {

@Override
public int getPartition(String tuple) {

String[] toSplit = tuple.split("\\^");
String exId = toSplit[1];

return exId.hashCode();

}

}


Any guidance please…


Thanks a lot.


Regards,
Raja.


Console is not showing any data about application

2016-07-06 Thread Raja . Aravapalli

Hi,

I am using community edition, and when I click the running application in 
console ui, it is not giving any data and showing !!!

An error occurred fetching data.


Any help on what could be the problem ?


Thank you.


Regards,
Raja.


Re: sticky partitioning

2016-07-06 Thread Raja . Aravapalli

Hi Sandesh,

Thanks for the response.

What I am looking for is StickKey partitioning!!

Something like….. Records with same key A goes into same instance of n operator 
!! [ Sticky Partition vs Round Robin @ 
http://docs.datatorrent.com/application_development/#sticky-partition-vs-round-robin]

Can you pls share some reference code link ?


Regards,
Raja.

From: Sandesh Hegde >
Reply-To: "users@apex.apache.org" 
>
Date: Wednesday, July 6, 2016 at 10:06 AM
To: "users@apex.apache.org" 
>
Subject: Re: sticky partitioning

Default stateless partitioning is the one you want.

Here is the Sample code, which uses that.

https://github.com/DataTorrent/streaming-benchmarks/blob/master/apex-benchmarks/src/main/java/apex/benchmark/ApplicationWithGenerator.java




On Wed, Jul 6, 2016 at 7:57 AM Raja.Aravapalli 
> wrote:

Hi,

Can some one please point me to a sample code/link that does uses the stickykey 
partitioning in Apex.

I couldn’t locate any..

Thanks a lot.


Regards,
Raja.


sticky partitioning

2016-07-06 Thread Raja . Aravapalli

Hi,

Can some one please point me to a sample code/link that does uses the stickykey 
partitioning in Apex.

I couldn’t locate any..

Thanks a lot.


Regards,
Raja.


Runtime property modifications for an Apex Application

2016-07-05 Thread Raja . Aravapalli

Hi,

Is it possible for me to set below DEBUG property for a running applicaion 
using apex command line tool, without killing or shutting down a running 
application ?

 dt.attr.DEBUG true 


If so can someone help me with the command, how I can achieve this ? Thank you.



Regards,
Raja.


Re: how to increase lifetime of hdfs delegation tokens ?

2016-07-05 Thread Raja . Aravapalli

Hi Pramod,

As suggested in step2, I added the properties you provided below, and ran a new 
job, to see the  debug lines "number of tokens: " and "updated token: "

And, as you said, I can see those lines in the application log now..

Does this mean, the renewal is working as expected ? So, will my application 
run continously even after 7days now ? We have only changed refersh times here… 
how does this ensure the application runs indefinite time…




Regards,
Raja.

From: "Raja.Aravapalli" 
>
Reply-To: "users@apex.apache.org" 
>
Date: Saturday, July 2, 2016 at 9:52 AM
To: "users@apex.apache.org" 
>
Subject: Re: how to increase lifetime of hdfs delegation tokens ?


Thanks Pramod, I will these suggestions and let you know. Thanks a lot.


Regards,
Raja.

From: Pramod Immaneni >
Reply-To: "users@apex.apache.org" 
>
Date: Friday, July 1, 2016 at 8:36 PM
To: "users@apex.apache.org" 
>
Subject: Re: how to increase lifetime of hdfs delegation tokens ?

Hi Raja,

Some questions for you and also some options for you to verify without waiting 
for a long time.

1. Do you see a warning message like ""No keytab specified for refreshing 
tokens, application may not be able to run indefinitely" when application is 
being launched from command line.

2. For testing, set the parameters below in your dt-site.xml and launch your 
application. This will make the application think that the tokens are only 
valid for 5 minutes and within 5 * 0.7 = 3.5 minutes (token refresh factor is 
0.7) the application should try to get new tokens. It should print the debug 
lines "number of tokens: " and "updated token: " in the application master 
logs. The application master logs are in the log file of the first container of 
the application. Let me know if you see those log lines.


dt.resourcemanager.delegation.token.max-lifetime
30



dt.namenode.delegation.token.max-lifetime
30



  dt.attr.DEBUG
  true


For more information about application auto-fetching new tokens read here

https://github.com/apache/apex-core/blob/master/docs/security.md

Thanks

On Fri, Jul 1, 2016 at 1:08 PM, Raja.Aravapalli 
> wrote:

Thanks a lot Pramod. Will wait for your response.


Regards,
Raja.

From: Pramod Immaneni >
Reply-To: "users@apex.apache.org" 
>
Date: Friday, July 1, 2016 at 10:56 AM

To: "users@apex.apache.org" 
>
Subject: Re: how to increase lifetime of hdfs delegation tokens ?

Hi Raja,

Let me look at this and get back to you.

Thanks

On Thu, Jun 30, 2016 at 11:20 PM, Raja.Aravapalli 
> wrote:

Can someone pls help me, how can I ensure, my apex application doesn’t fail 
after 7days…

Thanks a lot.


Regards,
Raja.

From: "Raja.Aravapalli" 
>
Reply-To: "users@apex.apache.org" 
>
Date: Thursday, June 30, 2016 at 6:06 AM

To: "users@apex.apache.org" 
>
Subject: Re: how to increase lifetime of hdfs delegation tokens ?


Hi,

I triggered my application by specifying properties, 
“dt.authentication.principal” & “dt.authentication.keytab” , BUT, did not 
specify the property “dt.authentication.store.keytab”.

I also observed the keytab is copied to hdfs location 
“/user//datatorrent”. But, still my apex application failed after 7days!!!

I am setting these properties in “properties.xml” file!

How can I ensure my settings are working correct. Having waiting for 7days to 
learn its failure is a very tough thing. Hope there should be some other 
alternatives.

Can someone pls help me fix this ….  Thanks a lot !!


Regards,
Raja.

From: "Raja.Aravapalli" 
>
Reply-To: "users@apex.apache.org" 
>
Date: Monday, June 20, 2016 at 5:43 PM
To: "users@apex.apache.org" 
>
Subject: Re: how to increase lifetime of hdfs delegation tokens ?


Sure Pramod. Please respond on this mail chain when 

Re: how to increase lifetime of hdfs delegation tokens ?

2016-07-02 Thread Raja . Aravapalli

Thanks Pramod, I will these suggestions and let you know. Thanks a lot.


Regards,
Raja.

From: Pramod Immaneni >
Reply-To: "users@apex.apache.org" 
>
Date: Friday, July 1, 2016 at 8:36 PM
To: "users@apex.apache.org" 
>
Subject: Re: how to increase lifetime of hdfs delegation tokens ?

Hi Raja,

Some questions for you and also some options for you to verify without waiting 
for a long time.

1. Do you see a warning message like ""No keytab specified for refreshing 
tokens, application may not be able to run indefinitely" when application is 
being launched from command line.

2. For testing, set the parameters below in your dt-site.xml and launch your 
application. This will make the application think that the tokens are only 
valid for 5 minutes and within 5 * 0.7 = 3.5 minutes (token refresh factor is 
0.7) the application should try to get new tokens. It should print the debug 
lines "number of tokens: " and "updated token: " in the application master 
logs. The application master logs are in the log file of the first container of 
the application. Let me know if you see those log lines.


dt.resourcemanager.delegation.token.max-lifetime
30



dt.namenode.delegation.token.max-lifetime
30



  dt.attr.DEBUG
  true


For more information about application auto-fetching new tokens read here

https://github.com/apache/apex-core/blob/master/docs/security.md

Thanks

On Fri, Jul 1, 2016 at 1:08 PM, Raja.Aravapalli 
> wrote:

Thanks a lot Pramod. Will wait for your response.


Regards,
Raja.

From: Pramod Immaneni >
Reply-To: "users@apex.apache.org" 
>
Date: Friday, July 1, 2016 at 10:56 AM

To: "users@apex.apache.org" 
>
Subject: Re: how to increase lifetime of hdfs delegation tokens ?

Hi Raja,

Let me look at this and get back to you.

Thanks

On Thu, Jun 30, 2016 at 11:20 PM, Raja.Aravapalli 
> wrote:

Can someone pls help me, how can I ensure, my apex application doesn’t fail 
after 7days…

Thanks a lot.


Regards,
Raja.

From: "Raja.Aravapalli" 
>
Reply-To: "users@apex.apache.org" 
>
Date: Thursday, June 30, 2016 at 6:06 AM

To: "users@apex.apache.org" 
>
Subject: Re: how to increase lifetime of hdfs delegation tokens ?


Hi,

I triggered my application by specifying properties, 
“dt.authentication.principal” & “dt.authentication.keytab” , BUT, did not 
specify the property “dt.authentication.store.keytab”.

I also observed the keytab is copied to hdfs location 
“/user//datatorrent”. But, still my apex application failed after 7days!!!

I am setting these properties in “properties.xml” file!

How can I ensure my settings are working correct. Having waiting for 7days to 
learn its failure is a very tough thing. Hope there should be some other 
alternatives.

Can someone pls help me fix this ….  Thanks a lot !!


Regards,
Raja.

From: "Raja.Aravapalli" 
>
Reply-To: "users@apex.apache.org" 
>
Date: Monday, June 20, 2016 at 5:43 PM
To: "users@apex.apache.org" 
>
Subject: Re: how to increase lifetime of hdfs delegation tokens ?


Sure Pramod. Please respond on this mail chain when you get to know..

Thanks very much.


Regards,
Raja.

From: Pramod Immaneni >
Reply-To: "users@apex.apache.org" 
>
Date: Monday, June 20, 2016 at 4:54 PM
To: "users@apex.apache.org" 
>
Subject: Re: how to increase lifetime of hdfs delegation tokens ?

Raja,

I believe it would. I will check and get back to you but the easiest way for 
you to check is that the file should appear in HDFS under 
/user//datatorrent with the same filename as it is in your local 
filesystem.

Thanks

On Mon, Jun 20, 2016 at 2:40 PM, Raja.Aravapalli 
> wrote:

Thanks for the response Pramod.

My quick question is, I see we should mention 

Re: Application restarts

2016-06-21 Thread Raja . Aravapalli

Thanks Thomas, application is running fine then. Was just curious to know about 
the errors.


Regards,
Raja.

From: Thomas Weise >
Reply-To: "users@apex.apache.org" 
>
Date: Tuesday, June 21, 2016 at 2:32 PM
To: "users@apex.apache.org" 
>
Subject: Re: Application restarts

This looks like the check of the last known container from the previous attempt 
when the AM recovers from saved state. You can ignore this message if the 
application comes back up and resumes processing. There is no data loss, the 
application resumes from checkpointed state.

On Tue, Jun 21, 2016 at 8:02 AM, Raja.Aravapalli 
> wrote:

Thanks Priyanka, this information helps.

Also, when I do restarts, I am also seeing below ERROR Exception in logs… Do we 
know what they mean, and would that cause any data loss !!



2016-06-21 09:48:18,995 ERROR stram.StreamingAppMasterService 
(StreamingAppMasterService.java:onGetContainerStatusError(1164)) - Failed to 
query the status of container_e3_1463086279244_457551_01_1114442
org.apache.hadoop.security.token.SecretManager$InvalidToken: No NMToken sent 
for ***some_hostname



Thanks.


Regards,
Raja.


From: Priyanka Gugale 
>
Reply-To: "users@apex.apache.org" 
>
Date: Tuesday, June 21, 2016 at 12:58 AM

To: "users@apex.apache.org" 
>
Subject: Re: Application restarts

Can you try @BindMap, check kyro README for how to use it: 
https://github.com/EsotericSoftware/kryo
Looks like it allows us to use different sterilizers for Map key and value 
fields.

-Priyanka

On Tue, Jun 21, 2016 at 10:59 AM, Raja.Aravapalli 
> wrote:

Hi Ram,

I have a variable like below,

private TreeMap varMap;


During restarts, the DateTime key in the map is failing with serialization 
exceptions…

Can you please help me now….  How to annotate this variable to only take 
DateTime key for JodaDateTime serilization  ?

Thanks a lot.


Regards,
Raja.

From: "Raja.Aravapalli" 
>
Reply-To: "users@apex.apache.org" 
>
Date: Monday, June 20, 2016 at 6:48 PM

To: "users@apex.apache.org" 
>
Subject: Re: Application restarts


Thanks Ram.

I will test and let you know.


Regards,
Raja.

From: Munagala Ramanath >
Reply-To: "users@apex.apache.org" 
>
Date: Monday, June 20, 2016 at 5:50 PM
To: "users@apex.apache.org" 
>
Subject: Re: Application restarts

If your field type is org.joda.time.DateTime and declared thus:

DateTime jodaDateTime;

How about just adding these imports:

import com.esotericsoftware.kryo.serializers.FieldSerializer.Bind;
import de.javakaffee.kryoserializers.jodatime.JodaDateTimeSerializer;

and adding this annotation just above the field declaration, like so:

@FieldSerializer.Bind(JodaDateTimeSerializer.class)
DateTime jodaDateTime;


Ram

On Mon, Jun 20, 2016 at 3:20 PM, Raja.Aravapalli 
> wrote:

Thanks a lot Ram for the response.

Yes, I am relaunching with application Id.

The usage related information in the documentation links are like bit heavy for 
the knowledge I have… do we have a simple example on how to use?

By the way, I want to use JodaTime DateTime Serializer which are already 
available I believe. Any help for using DateTime JodaDateTimeSerializer() ?

Thanks a lot.


Regards,
Raja.

From: Munagala Ramanath >
Reply-To: "users@apex.apache.org" 
>
Date: Monday, June 20, 2016 at 4:32 PM
To: "users@apex.apache.org" 
>
Subject: Re: Application restarts

By "relaunches" I assume you mean you're launching with the previous 
application ID ?

When you do that, the platform attempts to restore state from the previously 
saved serialized form.
To do that, it needs to firstcreate the object with a no-arg constructor and 
then populate it.
If that constructor is absent, you get the exception.

Please see the section 

Re: how to increase lifetime of hdfs delegation tokens ?

2016-06-20 Thread Raja . Aravapalli

Sure Pramod. Please respond on this mail chain when you get to know..

Thanks very much.


Regards,
Raja.

From: Pramod Immaneni >
Reply-To: "users@apex.apache.org" 
>
Date: Monday, June 20, 2016 at 4:54 PM
To: "users@apex.apache.org" 
>
Subject: Re: how to increase lifetime of hdfs delegation tokens ?

Raja,

I believe it would. I will check and get back to you but the easiest way for 
you to check is that the file should appear in HDFS under 
/user//datatorrent with the same filename as it is in your local 
filesystem.

Thanks

On Mon, Jun 20, 2016 at 2:40 PM, Raja.Aravapalli 
> wrote:

Thanks for the response Pramod.

My quick question is, I see we should mention these properties in dt-site.xml 
!! I am not sure about dt-site.xml, all I am using is only properites.xml file, 
which I am using to pass some configuration to application.
Can I set these in properties.xml file and it will still work ?


Regards,
Raja.

From: Pramod Immaneni >
Reply-To: "users@apex.apache.org" 
>
Date: Monday, June 20, 2016 at 4:32 PM

To: "users@apex.apache.org" 
>
Subject: Re: how to increase lifetime of hdfs delegation tokens ?

Hi Raja,

Yes the keytab would be copied over to HDFS and reused for getting a new token 
before the old one expires. By default it is 7 days. If it is different in your 
cluster please set the properties 
dt.resourcemanager.delegation.token.max-lifetime and 
dt.namenode.delegation.token.max-lifetime in dt-site.xml. Also if you don't the 
default keytab to be copied over into HDFS and reused you can specify your own 
keytab file for fetching a new token by putting it in HDFS and specifying the 
property dt.authentication.store.keytab.All this is described in the document 
that Thomas sent over.

Thanks

On Mon, Jun 20, 2016 at 1:54 PM, Raja.Aravapalli 
> wrote:

Hi Thomas,

To ensure auto renewal of delegation tokens life time, Can I use the the below 
properties in properties.xml file ?



dt.authentication.principal
kerberos-principal-of-user


dt.authentication.keytab
absolute-path-to-keytab-file


FYI,
I am launching application from Apex CLI! And till this time I haven’t used the 
above properties when launching apex applications in our secure hadoop 
environment, still they worked fine without any issues, but failing after 
7days!!

If I set the above properties in properties.xml, will that do auto-renewal and 
run successfully without any issues of failing again due to delegation token 
lifetime expiry ??

Please advise.


Thanks a lot in advance.


Regards,
Raja.

From: "Raja.Aravapalli" 
>
Reply-To: "users@apex.apache.org" 
>
Date: Sunday, June 19, 2016 at 3:30 PM

To: "users@apex.apache.org" 
>
Subject: Re: how to increase lifetime of hdfs delegation tokens ?


Thanks a lot Thomas.

Will take this as reference and test our application. Great!


Regards,
Raja.

From: Thomas Weise >
Reply-To: "users@apex.apache.org" 
>
Date: Sunday, June 19, 2016 at 2:01 PM
To: "users@apex.apache.org" 
>
Subject: Re: how to increase lifetime of hdfs delegation tokens ?

Token expiration working as expected!

Please have a look on how to extend or refresh it:

https://github.com/apache/apex-core/blob/master/docs/security.md#token-refresh

Thanks,
Thomas


On Sat, Jun 18, 2016 at 10:26 PM, Raja.Aravapalli 
> wrote:

Hi,

My Apex application failed exactly after running 7days in our distributed 
hadoop environment, with delegation token expiry!!

Can someone pls help me with details, on how I can increase the delegation 
token time to lifetime or any other process running in parallel to renew the 
tokens ?

Exception details below:


ERROR hdfs.DFSClient (DFSClient.java:closeAllFilesBeingWritten(954)) - Failed 
to close inode 
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken):
 token (HDFS_DELEGATION_TOKEN token 11 for XX) is expired
   

Re: how to pass arguments ?

2016-06-15 Thread Raja . Aravapalli

Thanks Sandesh.


Regards,
Raja.

From: Sandesh Hegde >
Reply-To: "users@apex.apache.org" 
>
Date: Wednesday, June 15, 2016 at 11:20 AM
To: "users@apex.apache.org" 
>
Subject: Re: how to pass arguments ?

Hi Raja,

Take look at the following example,

Redis URL is coming from the configuration file and accessed in the 
"Application.java"

https://github.com/DataTorrent/streaming-benchmarks/blob/master/apex-benchmarks/src/main/java/apex/benchmark/ApplicationWithGenerator.java

Thanks



On Wed, Jun 15, 2016 at 9:00 AM Raja.Aravapalli 
> wrote:

Hi,

How can I pass arguments to Application.java class ? Like how we pass (String[] 
args) in main method in java ?

Couldn’t find any reference. Can someone please share knowledge on this ? 
Thanks a lot.



Regards,
Raja.


Re: kafka input is processing records in a jumbled order

2016-06-11 Thread Raja . Aravapalli

Thanks Ashwin for sharing your thoughts.

Sure, I will work towards contributing. But, I am very new to this. May take a 
good amount of time for me to come with right design for operator.

I still couldn’t able to find the right lag time for my application, because 
still some times few messages are arriving delay than expected (may be network 
is too slow)!!

However, with introduction of lag, I was able to decrease a amount of records 
that were missing significantly than earlier.

Thanks team for sharing your thoughts :)



-Regards,
Raja.


From: Ashwin Chandra Putta 
>
Reply-To: "users@apex.apache.org" 
>
Date: Friday, June 10, 2016 at 5:24 PM
To: "users@apex.apache.org" 
>
Subject: Re: kafka input is processing records in a jumbled order

Raja,

Great discussion, just a couple more thoughts.

1. Add this operator to your DAG just before the operator which expects the 
tuples to be in order. If you order the tuples right after kafka input and then 
you have multiple partitions of a downstream operator, then the order will be 
lost again. So better to order only at the point in the DAG just before where 
it is required.
2. Seems like you are building this operator for your use case, it will be 
useful for a lot of other use cases too. So if possible, can you contribute the 
operator back to malhar? That way we can make it available for other to use too 
and collectively enhance it as needed.

Regards,
Ashwin.

On Thu, Jun 9, 2016 at 2:43 PM, Raja.Aravapalli 
> wrote:

Great.

I will explore. Thanks for your inputs.


Regards,
Raja.

From: Munagala Ramanath >
Reply-To: "users@apex.apache.org" 
>
Date: Thursday, June 9, 2016 at 3:38 PM

To: "users@apex.apache.org" 
>
Subject: Re: kafka input is processing records in a jumbled order

Any fields within an operator that are not declared "transient" are considered 
part of the operator state and
are checkpointed; on failure, they are restored from the checkpoint 
automatically by the platform.

Ram

On Thu, Jun 9, 2016 at 1:04 PM, Raja.Aravapalli 
> wrote:

Really a great thought!!

Wondering how will application handle the failures ?

Also, what does this phrase mean “hold them in the operator state” ?  Hold 
incoming messages in some data structure, map etc ?

-Regards,
Raja.

From: Thomas Weise >
Reply-To: "users@apex.apache.org" 
>
Date: Thursday, June 9, 2016 at 2:57 PM
To: "users@apex.apache.org" 
>
Subject: Re: kafka input is processing records in a jumbled order

You can order the messages by event time as they arrive, hold them in the 
operator state and then only emit those in the endWindow callback that are 
older than  (note that you are not simply emitting all, only those 
that are old enough).

Thomas

On Thu, Jun 9, 2016 at 12:22 PM, Raja.Aravapalli 
> wrote:

So,  you want me to add all the incoming tuples into a Map, and complete the 
processing in endwindow() ??

How would this solve my problem as described below with windowing.


msg2 ts2
—— window ends here
msg1 ts1
msg3 ts3
msg4 ts4
msg5 ts5
msg7 ts7
msg6 ts6
—— window ends here

Thanks a lot for your inputs. Your thoughts are valuable!!



Regards,
Raja.

From: Thomas Weise >
Reply-To: "users@apex.apache.org" 
>
Date: Thursday, June 9, 2016 at 1:49 PM

To: "users@apex.apache.org" 
>
Subject: Re: kafka input is processing records in a jumbled order


There is no need for an extra thread. In fact, tuples should be emitted in the 
operator thread only. This can be done in endWindow()

--
sent from mobile

On Jun 9, 2016 11:46 AM, "Sandesh Hegde" 
> wrote:

How about something like this,

Store the incoming tuples in the following format:
   TreeMap

Create a Flusher thread, which periodically flushes the *fristKey*, after 
considering the lag.


On Thu, Jun 9, 2016 at 11:09 AM Munagala Ramanath 
> wrote:
You'll need to 

Re: kafka input is processing records in a jumbled order

2016-06-09 Thread Raja . Aravapalli

So,  you want me to add all the incoming tuples into a Map, and complete the 
processing in endwindow() ??

How would this solve my problem as described below with windowing.


msg2 ts2
—— window ends here
msg1 ts1
msg3 ts3
msg4 ts4
msg5 ts5
msg7 ts7
msg6 ts6
—— window ends here

Thanks a lot for your inputs. Your thoughts are valuable!!



Regards,
Raja.

From: Thomas Weise >
Reply-To: "users@apex.apache.org" 
>
Date: Thursday, June 9, 2016 at 1:49 PM
To: "users@apex.apache.org" 
>
Subject: Re: kafka input is processing records in a jumbled order


There is no need for an extra thread. In fact, tuples should be emitted in the 
operator thread only. This can be done in endWindow()

--
sent from mobile

On Jun 9, 2016 11:46 AM, "Sandesh Hegde" 
> wrote:

How about something like this,

Store the incoming tuples in the following format:
   TreeMap

Create a Flusher thread, which periodically flushes the *fristKey*, after 
considering the lag.


On Thu, Jun 9, 2016 at 11:09 AM Munagala Ramanath 
> wrote:
You'll need to have some some limit one how a lag is possible for out-of-order 
messages.
If that limit is say 30s, then you'll need to buffer tuples for double the lag 
-- 60s.

You can configure the Application Window size suitably to do this.

Ram

On Thu, Jun 9, 2016 at 10:40 AM, Raja.Aravapalli 
> wrote:

No aggregation, but I need messages to be played in sequential !!


Ex:

Below is the way actually msgs should come from my kafka topic

msg1 ts1
msg2 ts2
msg3 ts3
msg4 ts4
msg5 ts5
msg6 ts6
msg7 ts7


But, due to some network issues I am seeing the messages in kafka topic 
something like below:

msg2 ts2  ==> msg2 which actually should come after msg1, but unfortunately 
msg2 is coming to kafka before msg1, losing the sequence!!
msg1 ts1 ==> delayed by few milli secs to seconds to reach on time!!
msg3 ts3
msg4 ts4
msg5 ts5
msg7 ts7 ==> msg7 had come early into topics before msg6
msg6 ts6 ==> delayed !!


I am losing the order of messages and business logic gives correct results only 
when msgs played in sequence!!

Now if I define windowing/some buffering and then order on timestamp and play 
msgs…

What if window boundary takes

msg2 ts2
—— window ends here
msg1 ts1
msg3 ts3
msg4 ts4
msg5 ts5
msg7 ts7
msg6 ts6
—— window ends here

Now, if you see, even though I am trying to do buffering and then ordering the 
msgs based on some timstamp, I still face the problem of msg2 already processed 
before msg1 !! Which I don’t want.

Did I really understand windowing correctly…. Pls correct me if I am wrong!! 
Thanks for your thoughts!!


Regards,
Raja.

From: Thomas Weise >
Reply-To: "users@apex.apache.org" 
>
Date: Thursday, June 9, 2016 at 10:51 AM
To: "users@apex.apache.org" 
>
Subject: Re: kafka input is processing records in a jumbled order

Apex can do stateful processing, you can define a window in which you can 
reorder the messages. It will have the same effect on latency as 
"micro-batching".

Why is the ordering important? What operations do you perform on the data? 
Aggregation?

Thanks,
Thomas


On Thu, Jun 9, 2016 at 8:23 AM, Raja.Aravapalli 
> wrote:

My bad… we observes our source data in kafka topics is not really in a ordered 
fashion, where we are seeing the messages with few milli secs delay.!!

Source couldn’t ensure the ordering guarantee due to the network!!

Is there a right way for me from consumer standpoint, I can ensure ordering ?? 
Will micro batching work for me here ? Or Does apex support micro batching and 
order the messages ?



Regards,
Raja

From: Thomas Weise >
Reply-To: "users@apex.apache.org" 
>
Date: Tuesday, June 7, 2016 at 10:59 PM

To: "users@apex.apache.org" 
>
Subject: Re: kafka input is processing records in a jumbled order

Raja,

Please also confirm how you are using partitioning. If, for example, in your 
DAG you shuffle the data received from Kafka in a way that is different from 
the original partitioning, then it would be possible that multiple downstream 
partitions process data that came from a single Kafka partition concurrently 
and therefore in a 

Re: kafka input is processing records in a jumbled order

2016-06-09 Thread Raja . Aravapalli

No aggregation, but I need messages to be played in sequential !!


Ex:

Below is the way actually msgs should come from my kafka topic

msg1 ts1
msg2 ts2
msg3 ts3
msg4 ts4
msg5 ts5
msg6 ts6
msg7 ts7


But, due to some network issues I am seeing the messages in kafka topic 
something like below:

msg2 ts2  ==> msg2 which actually should come after msg1, but unfortunately 
msg2 is coming to kafka before msg1, losing the sequence!!
msg1 ts1 ==> delayed by few milli secs to seconds to reach on time!!
msg3 ts3
msg4 ts4
msg5 ts5
msg7 ts7 ==> msg7 had come early into topics before msg6
msg6 ts6 ==> delayed !!


I am losing the order of messages and business logic gives correct results only 
when msgs played in sequence!!

Now if I define windowing/some buffering and then order on timestamp and play 
msgs…

What if window boundary takes

msg2 ts2
—— window ends here
msg1 ts1
msg3 ts3
msg4 ts4
msg5 ts5
msg7 ts7
msg6 ts6
—— window ends here

Now, if you see, even though I am trying to do buffering and then ordering the 
msgs based on some timstamp, I still face the problem of msg2 already processed 
before msg1 !! Which I don’t want.

Did I really understand windowing correctly…. Pls correct me if I am wrong!! 
Thanks for your thoughts!!


Regards,
Raja.

From: Thomas Weise >
Reply-To: "users@apex.apache.org" 
>
Date: Thursday, June 9, 2016 at 10:51 AM
To: "users@apex.apache.org" 
>
Subject: Re: kafka input is processing records in a jumbled order

Apex can do stateful processing, you can define a window in which you can 
reorder the messages. It will have the same effect on latency as 
"micro-batching".

Why is the ordering important? What operations do you perform on the data? 
Aggregation?

Thanks,
Thomas


On Thu, Jun 9, 2016 at 8:23 AM, Raja.Aravapalli 
> wrote:

My bad… we observes our source data in kafka topics is not really in a ordered 
fashion, where we are seeing the messages with few milli secs delay.!!

Source couldn’t ensure the ordering guarantee due to the network!!

Is there a right way for me from consumer standpoint, I can ensure ordering ?? 
Will micro batching work for me here ? Or Does apex support micro batching and 
order the messages ?



Regards,
Raja

From: Thomas Weise >
Reply-To: "users@apex.apache.org" 
>
Date: Tuesday, June 7, 2016 at 10:59 PM

To: "users@apex.apache.org" 
>
Subject: Re: kafka input is processing records in a jumbled order

Raja,

Please also confirm how you are using partitioning. If, for example, in your 
DAG you shuffle the data received from Kafka in a way that is different from 
the original partitioning, then it would be possible that multiple downstream 
partitions process data that came from a single Kafka partition concurrently 
and therefore in a different order.

Thomas


On Tue, Jun 7, 2016 at 6:33 PM, Raja.Aravapalli 
> wrote:

Yes Devendra.

p1.10 is read before p1.1 !!

Sure I shall check that. Thanks a lot for your response.


Regards,
Raja.

From: Devendra Tagare 
>
Reply-To: "users@apex.apache.org" 
>
Date: Tuesday, June 7, 2016 at 7:59 PM

To: "users@apex.apache.org" 
>
Subject: Re: kafka input is processing records in a jumbled order

Hi Raja,

Just to be clear are you suggesting that p1.10 is being read before p1.1 ?

If thats the case can you use a console consumer that comes packed with kafka 
and verify the ordering based on timestamps ?

Thanks,
Dev



On Tue, Jun 7, 2016 at 5:31 PM, Raja.Aravapalli 
> wrote:

Thanks a lot Devendra Tagare for the response.

What you said is very clear and understandable. But, wondering, I am NOT 
getting that partition level order!! My operator is processing the records in 
jumbled order rather than in sequence!
And, I am saying this because, I am generating timestamps upon tuple receipt 
and emitting that timestamp to my destination, which is clearly showing the 
records are receiving to operator in a shuffled order.

I get records at milli second level differences!! Will that be a problem ?


Regards,
Raja.

From: Devendra Tagare 
>
Reply-To: "users@apex.apache.org" 

Re: kafka input is processing records in a jumbled order

2016-06-07 Thread Raja . Aravapalli

Thanks a lot Devendra Tagare for the response.

What you said is very clear and understandable. But, wondering, I am NOT 
getting that partition level order!! My operator is processing the records in 
jumbled order rather than in sequence!
And, I am saying this because, I am generating timestamps upon tuple receipt 
and emitting that timestamp to my destination, which is clearly showing the 
records are receiving to operator in a shuffled order.

I get records at milli second level differences!! Will that be a problem ?


Regards,
Raja.

From: Devendra Tagare 
>
Reply-To: "users@apex.apache.org" 
>
Date: Tuesday, June 7, 2016 at 7:12 PM
To: "users@apex.apache.org" 
>
Subject: Re: kafka input is processing records in a jumbled order

Hi Raja,

When you apply ONE_TO_MANY partitioning scheme, one instance of the operator 
consumes from many partitions of a kafka topic.

When you look at the consumed data, all the events coming from a given 
partition would be ordered but there are no ordering guarantees across 
partitions since kafka does not guarantee that

eg : If 3 partitions of a topic p1,p2,p3 having 10 messages each are connected 
to one physical partition of the KafkaInputOperator , then the ordering 
guarantee of p1.1 to p1.10 is honored.ie message 10 of p1 be 
consumed only after messages 1 through 9 are consumed but the operator could 
consumer messages in a order like p1.1,p2.1,p1.2,p1.3,p3.1,p2.2. which 
still follows the guarantees per partition.

Thanks,
Dev

On Tue, Jun 7, 2016 at 5:00 PM, Raja.Aravapalli 
> wrote:

Thanks for the response Thomas.

My quick doubt is..

I have around 30 partitions of kafka topic, And all of them have messages 
ordered at partition level.

So, when I consume those messages using single consumer[with ONE_TO_MANY 
strategy set], still the ordering doesn’t work ?


My messages in topic are guaranteed to be ordered at partition level.

Thanks a lot in advance for your response.


Regards,
Raja.

From: Thomas Weise >
Reply-To: "users@apex.apache.org" 
>
Date: Tuesday, June 7, 2016 at 5:52 PM
To: "users@apex.apache.org" 
>
Subject: Re: kafka input is processing records in a jumbled order

Raja,

Are you expecting ordering across multiple Kafka partitions?

All messages from a given Kafka partition are received by the same consumer and 
thus will be ordered. However, when messages come from multiple partitions 
there is no such guarantee.

Thomas


On Tue, Jun 7, 2016 at 3:34 PM, Raja.Aravapalli 
> wrote:

Hi

I have built a DAG, that reads from kafka and in the next operators, does 
lookup to a hbase table and update hbase table based on some business logic.

Some times my operator which does hbase lookup and update in the same 
operator(Custom written), is processing the records it receives from kafka in a 
jumbled order, which is causing, many records being ignored from processing!!

I am not using any parallel partitions/instance, and with KafkaInputOperator I 
am using only partition strategy ONE_TO_MANY.

I am very new to Apex. I expected, Apex will guarantee the ordering.

Can someone pls share your knowledge on the issue…?


Thanks a lot in advance…


Regards,
Raja.




Re: kafka input is processing records in a jumbled order

2016-06-07 Thread Raja . Aravapalli

Thanks for the response Thomas.

My quick doubt is..

I have around 30 partitions of kafka topic, And all of them have messages 
ordered at partition level.

So, when I consume those messages using single consumer[with ONE_TO_MANY 
strategy set], still the ordering doesn’t work ?


My messages in topic are guaranteed to be ordered at partition level.

Thanks a lot in advance for your response.


Regards,
Raja.

From: Thomas Weise >
Reply-To: "users@apex.apache.org" 
>
Date: Tuesday, June 7, 2016 at 5:52 PM
To: "users@apex.apache.org" 
>
Subject: Re: kafka input is processing records in a jumbled order

Raja,

Are you expecting ordering across multiple Kafka partitions?

All messages from a given Kafka partition are received by the same consumer and 
thus will be ordered. However, when messages come from multiple partitions 
there is no such guarantee.

Thomas


On Tue, Jun 7, 2016 at 3:34 PM, Raja.Aravapalli 
> wrote:

Hi

I have built a DAG, that reads from kafka and in the next operators, does 
lookup to a hbase table and update hbase table based on some business logic.

Some times my operator which does hbase lookup and update in the same 
operator(Custom written), is processing the records it receives from kafka in a 
jumbled order, which is causing, many records being ignored from processing!!

I am not using any parallel partitions/instance, and with KafkaInputOperator I 
am using only partition strategy ONE_TO_MANY.

I am very new to Apex. I expected, Apex will guarantee the ordering.

Can someone pls share your knowledge on the issue…?


Thanks a lot in advance…


Regards,
Raja.



kafka input is processing records in a jumbled order

2016-06-07 Thread Raja . Aravapalli

Hi

I have built a DAG, that reads from kafka and in the next operators, does 
lookup to a hbase table and update hbase table based on some business logic.

Some times my operator which does hbase lookup and update in the same 
operator(Custom written), is processing the records it receives from kafka in a 
jumbled order, which is causing, many records being ignored from processing!!

I am not using any parallel partitions/instance, and with KafkaInputOperator I 
am using only partition strategy ONE_TO_MANY.

I am very new to Apex. I expected, Apex will guarantee the ordering.

Can someone pls share your knowledge on the issue…?


Thanks a lot in advance…


Regards,
Raja.


hbase

2016-06-07 Thread Raja . Aravapalli

Hi

I am trying to connect to hbase and do lookup as well update a table in hbase 
in one of my operators!!

When I include the Hbase dependancies and run the application, application 
failing!! Can someone please share your thoughts and help me fix this ?

My application is failing with below error:


Exception in thread "main" java.lang.IllegalArgumentException: Invalid 
ContainerId: container_e3059_1463086279244_52690_02_01
at 
org.apache.hadoop.yarn.util.ConverterUtils.toContainerId(ConverterUtils.java:182)
at 
com.datatorrent.stram.StreamingAppMaster.main(StreamingAppMaster.java:91)
Caused by: java.lang.NumberFormatException: For input string: "e3059"
at 
java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
at java.lang.Long.parseLong(Long.java:589)
at java.lang.Long.parseLong(Long.java:631)
at 
org.apache.hadoop.yarn.util.ConverterUtils.toApplicationAttemptId(ConverterUtils.java:137)
at 
org.apache.hadoop.yarn.util.ConverterUtils.toContainerId(ConverterUtils.java:177)
... 1 more



POM.xml FYR



http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
  4.0.0

  com..apextests
  1.0-SNAPSHOT
  firstapexapp
  jar

  
  My Apex Application
  My Apex Application Description

  

3.5.0-SNAPSHOT
lib/*.jar
2.7.1.2.3.4.0-3485
1.1.2.2.3.4.0-3485
  

  

   
 org.apache.maven.plugins
 maven-eclipse-plugin
 2.9
 
   true
 
   
   
 maven-compiler-plugin
 3.3
 
   UTF-8
   1.7
   1.7
   true
   false
   true
   true
 
   
   
 maven-dependency-plugin
 2.8
 
   
 copy-dependencies
 prepare-package
 
   copy-dependencies
 
 
   target/deps
   runtime
 
   
 
   

   
 maven-assembly-plugin
 
   
 app-package-assembly
 package
 
   single
 
 
   
${project.artifactId}-${project.version}-apexapp
   false
   
 src/assemble/appPackage.xml
   
   
 0755
   
   
 
   ${apex.apppackage.classpath}
   ${apex.version}
   
${project.groupId}
   
${project.artifactId}
   
${project.version}
   
${project.name}
   
${project.description}
 
   
 
   
 
   

   
 maven-antrun-plugin
 1.7
 
   
 package
 
   
 
   
 
 
   run
 
   
   
 
 createJavadocDirectory
 generate-resources
 
   
 
 
   
 
 
   run
 
   
 
   

   
 org.codehaus.mojo
 build-helper-maven-plugin
 1.9.1
 
   
 attach-artifacts
 package
 
   attach-artifact
 
 
   
 
   
target/${project.artifactId}-${project.version}.apa
   apa
 
   
   false
 
   
 
   



  

  

  HDPReleases
  HDP Releases
  http://repo.hortonworks.com/content/repositories/releases/
  default


  HDP Jetty Hadoop
  HDP Jetty Hadoop
  http://repo.hortonworks.com/content/repositories/jetty-hadoop/
  default


  confluent
  http://packages.confluent.io/maven

  

  


  org.apache.apex
  malhar-library
  3.4.0
  
  


  org.apache.apex
  apex-common
  ${apex.version}
  provided


  junit
  junit
  4.10
  test


  org.apache.apex
  apex-engine
  ${apex.version}
  test




  org.apache.apex
  malhar-contrib
  3.4.0




  org.apache.apex
  malhar-kafka
  3.4.0



  org.apache.avro
  avro
  1.7.7



  org.apache.kafka
  kafka_2.10
  0.8.1.1



  io.confluent
  kafka-avro-serializer
  1.0
  

  log4j
  log4j


  org.slf4j
  slf4j-log4j12

Re: kafka offset commit

2016-06-06 Thread Raja . Aravapalli

Thanks Siyuan.

So, to confirm, to apex is not storing offsets status at any location ? Like 
how Storm stores in Zookeeper ?


Regards,
Raja.

From: "hsy...@gmail.com" 
>
Reply-To: "users@apex.apache.org" 
>
Date: Monday, June 6, 2016 at 6:42 PM
To: "users@apex.apache.org" 
>
Subject: Re: kafka offset commit

Hey Raja,

For 0.8, you have to implement OffsetManager interface on your own. The 
updateOffsets will be called in application master every time when it get 
updated offsets from each physical partition. And the offsets that you see in 
the method is committed offset. So you can safely save these offsets into 
either zookeeper(0.8.2 client has API to do that) or any other datastore like 
DB or HDFS.  And also you have to implement the method loadInitialOffsets to 
load back offset you want.

You are welcome to contribute a default implementation using buildin kafka 
offset commit request API for OffsetManager!

Regards,
Siyuan

On Mon, Jun 6, 2016 at 3:36 PM, Raja.Aravapalli 
> wrote:

Hi Thomas,

We are using 0.8 cluster still!!


Regards,
Raja.

From: Thomas Weise >
Reply-To: "users@apex.apache.org" 
>
Date: Monday, June 6, 2016 at 5:23 PM
To: "users@apex.apache.org" 
>
Subject: Re: kafka offset commit

Hi Raja,

Which Kafka version are you using?

With the new 0.9 connector there is no need for the offset manager:

https://github.com/apache/apex-malhar/tree/master/kafka/src/main/java/org/apache/apex/malhar/kafka

Thanks,
Thomas


On Mon, Jun 6, 2016 at 3:06 PM, Raja.Aravapalli 
> wrote:
Hi

Can someone please help me understand, where will the offsets be stored when 
consuming with “KafkaSinglePortStringInputOperator”  ?

And, how to handle restarts ?


I worked with Storm earlier, Storm maintains the offsets in zookeeper and 
client id is maintained for every consumer, using which

- we can see what is the current offset status for a given partition & modify 
them as well using zookeeper-cli !!
- restarts can be handled


As per the Apex documentation, I can see, that using OffsetManager we can 
handle the restarts effectively, but couldn’t find any examples to refer…

How clientId can be used to retrieve offsets status
And ability to edit the offsets etc

can someone pls help me find this ?


Thanks a lot!!


-Regards,
Raja.







Re: kafka offset commit

2016-06-06 Thread Raja . Aravapalli

Hi Thomas,

We are using 0.8 cluster still!!


Regards,
Raja.

From: Thomas Weise >
Reply-To: "users@apex.apache.org" 
>
Date: Monday, June 6, 2016 at 5:23 PM
To: "users@apex.apache.org" 
>
Subject: Re: kafka offset commit

Hi Raja,

Which Kafka version are you using?

With the new 0.9 connector there is no need for the offset manager:

https://github.com/apache/apex-malhar/tree/master/kafka/src/main/java/org/apache/apex/malhar/kafka

Thanks,
Thomas


On Mon, Jun 6, 2016 at 3:06 PM, Raja.Aravapalli 
> wrote:
Hi

Can someone please help me understand, where will the offsets be stored when 
consuming with “KafkaSinglePortStringInputOperator”  ?

And, how to handle restarts ?


I worked with Storm earlier, Storm maintains the offsets in zookeeper and 
client id is maintained for every consumer, using which

- we can see what is the current offset status for a given partition & modify 
them as well using zookeeper-cli !!
- restarts can be handled


As per the Apex documentation, I can see, that using OffsetManager we can 
handle the restarts effectively, but couldn’t find any examples to refer…

How clientId can be used to retrieve offsets status
And ability to edit the offsets etc

can someone pls help me find this ?


Thanks a lot!!


-Regards,
Raja.






avrò deserialization fails when using kafka

2016-06-06 Thread Raja . Aravapalli

Hi,

I am trying to read data from kafka, and my input in kafka is avro messages.

So I am using class “KafkaSinglePortByteArrayInputOperator” to emit records 
from kafka.. And in the next operator I am reading input as "byte[]” and 
deserializing the message!!

But the tuple deserialization is failing with below error in the log…

Can someone pls share your thoughts and help me fix this?



Caused by: com.esotericsoftware.kryo.KryoException: Class cannot be created 
(missing no-arg constructor): io.confluent.kafka.serializers.KafkaAvroDecoder
Serialization trace:
decoder (com.tgt.mighty.apexapps.AvroBytesConversionOperator)
at 
com.esotericsoftware.kryo.Kryo$DefaultInstantiatorStrategy.newInstantiatorOf(Kryo.java:1228)
at com.esotericsoftware.kryo.Kryo.newInstantiator(Kryo.java:1049)
at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1058)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:547)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:523)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)



Code FYR:


Application.java file:

public void populateDAG(DAG dag, Configuration conf)
{
  //KafkaSinglePortStringInputOperator kafkaInput =  
dag.addOperator("Kafka_Input", KafkaSinglePortStringInputOperator.class);

  KafkaSinglePortByteArrayInputOperator kafkaInput =  
dag.addOperator("Kafka_Input", new KafkaSinglePortByteArrayInputOperator());

  AvroBytesConversionOperator avroConversion = dag.addOperator("Avro_Convert", 
new AvroBytesConversionOperator(“schemaRegURL"));

  HDFSWrite hdfs = dag.addOperator("To_HDFS", HDFSWrite.class);

  //dag.addStream("Kafka_To_Hdfs_Ingestion", kafkaInput.outputPort, hdfs.input);
  dag.addStream("Kafka_Avro_Msg_Byte_Stream", kafkaInput.outputPort, 
avroConversion.input);
  dag.addStream("Avro_To_String_Stream", avroConversion.output, hdfs.input);

}


Operator Code:

public class AvroBytesConversionOperator extends BaseOperator {

private String schemaRegURL;
private KafkaAvroDecoder decoder;

public AvroBytesConversionOperator(){

}

public AvroBytesConversionOperator(String schemaRegURL){
this.schemaRegURL = schemaRegURL;
}

/**
 * Defines Input Port - DefaultInputPort
 * Accepts data from the upstream operator
 * Type byte[]
 */
public transient DefaultInputPort input = new 
DefaultInputPort() {
@Override
public void process(byte[] tuple)
{
processTuple(tuple);
}
};


/**
 * Defines Output Port - DefaultOutputPort
 * Sends data to the down stream operator which can consume this data
 * Type String
 */
public transient DefaultOutputPort output = new 
DefaultOutputPort();


/**
 * Setup call
 */
@Override
public void setup(OperatorContext context)
{
Properties props = new Properties();
props.setProperty("schema.registry.url", this.schemaRegURL);
this.decoder = new KafkaAvroDecoder(new VerifiableProperties(props));
}

/**
 * Begin window call for the operator.
 * @param windowId
 */
public void beginWindow(long windowId)
{

}

/**
 * Defines what should be done with each incoming tuple
 */
protected void processTuple(byte[] tuple)
{
GenericRecord record = (GenericRecord) decoder.fromBytes(tuple);
output.emit(record.toString());
}

/**
 * End window call for the operator
 * If sending per window, emit the updated counts here.
 */
@Override
public void endWindow()
{

}

}