Re: kafka producer failed

2015-07-24 Thread Yi Pan
Hi, Selina,

Your question is not clear.
{quote}
When the messages was send to Kafka by KafkaProducer, It always failed
when the message more than 3000 - 4000 messages.
{quote}

What's failing? The error stack shows errors on the consumer side and you
were referring to failures to produce to Kafka. Could you be more specific
regarding to what's your failure scenario?

-Yi

On Thu, Jul 23, 2015 at 5:46 PM, Job-Selina Wu swucaree...@gmail.com
wrote:

 Hi,

 When the messages was send to Kafka by KafkaProducer, It always failed
 when the message more than 3000 - 4000 messages. The error is shown below.
 I am wondering if any topic size I need to set at Samza configuration?


 [2015-07-23 17:30:03,792] WARN

 [console-consumer-84579_Selinas-MacBook-Pro.local-1437697324624-eecb4f40-leader-finder-thread],
 Failed to find leader for Set([http-demo,0])
 (kafka.consumer.ConsumerFetcherManager$LeaderFinderThread)
 kafka.common.KafkaException: fetching topic metadata for topics
 [Set(http-demo)] from broker [ArrayBuffer()] failed
 at
 kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72)
 at
 kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93)
 at

 kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
 at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
 ^CConsumed 4327 messages

 Your reply and comment will be highly appreciated.


 Sincerely,
 Selina



Review Request 36769: SAMZA-742 Add a Contribution section in README of samza and hello-samza

2015-07-24 Thread Aleksandar Pejakovic

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36769/
---

Review request for samza.


Repository: samza


Description
---

Added Contribution section to README


Diffs
-

  README.md f83fd41 

Diff: https://reviews.apache.org/r/36769/diff/


Testing
---


Thanks,

Aleksandar Pejakovic



Review Request 36768: SAMZA-740: Add ElasticsearchProducer example to samza-hello-samza

2015-07-24 Thread Stuart Davidson

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36768/
---

Review request for samza.


Repository: samza-hello-samza


Description
---

I'd put together an example of how to use the new ElasticsearchProvider for my 
work colleagues and I thought it'd be worth submitting it back to the 
community. This also includes a script to start elasticsearch and kibana on the 
host - be aware, there's a 64bit version depending on what machine you want to 
run against.

Also note, this is against 0.10.0 of Samza which is not released yet. I am 
making the assumption that the Elasticsearch jars are bundled as part of that - 
if not, we'll need to add them to the POM here.


Diffs
-

  bin/grid-elastic PRE-CREATION 
  pom.xml f9c4fa9 
  src/main/assembly/src.xml f57fee2 
  src/main/config/wikipedia-elastic.properties PRE-CREATION 
  src/main/java/samza/examples/wikipedia/task/WikipediaElasticStreamTask.java 
PRE-CREATION 

Diff: https://reviews.apache.org/r/36768/diff/


Testing
---


Thanks,

Stuart Davidson



Review Request 36770: SAMZA-742 Add a Contribution section in README of samza and hello-samza

2015-07-24 Thread Aleksandar Pejakovic

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36770/
---

Review request for samza.


Repository: samza-hello-samza


Description
---

Hello Samza: added Contribution section to README


Diffs
-

  README.md 14bf45e 

Diff: https://reviews.apache.org/r/36770/diff/


Testing
---


Thanks,

Aleksandar Pejakovic



Review Request 36767: SAMZA-423: Integrate Lucene into Samza

2015-07-24 Thread Robert Zuljevic

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36767/
---

Review request for samza.


Repository: samza


Description
---

Initial commit to get feedback. Implemented skeleton for DocumentStore and an 
initial Luwak implementation.


Diffs
-

  build.gradle 0852adc4e8e0c2816afd1ebf433f1af6b44852f7 
  gradle/dependency-versions.gradle fb06e8ed393d1a38abfa1a48fe5244fc7f6c7339 
  
samza-ds-luwak/src/main/java/org/apache/samza/storage/document/luwak/engine/LuwakDocumentStorageEngineFactory.java
 PRE-CREATION 
  
samza-ds-luwak/src/main/java/org/apache/samza/storage/document/luwak/model/MonitorQueryElement.java
 PRE-CREATION 
  
samza-ds-luwak/src/main/java/org/apache/samza/storage/document/luwak/store/LuwakDocumentStore.java
 PRE-CREATION 
  
samza-ds/src/main/java/org/apache/samza/storage/document/engine/BaseDocumentStorageEngineFactory.java
 PRE-CREATION 
  
samza-ds/src/main/java/org/apache/samza/storage/document/engine/DocumentStorageEngine.java
 PRE-CREATION 
  samza-ds/src/main/java/org/apache/samza/storage/document/model/Element.java 
PRE-CREATION 
  
samza-ds/src/main/java/org/apache/samza/storage/document/store/CachedDocumentStore.java
 PRE-CREATION 
  
samza-ds/src/main/java/org/apache/samza/storage/document/store/DocumentStore.java
 PRE-CREATION 
  
samza-ds/src/main/java/org/apache/samza/storage/document/store/LoggedDocumentStore.java
 PRE-CREATION 
  
samza-ds/src/main/java/org/apache/samza/storage/document/store/MetricDocumentStore.java
 PRE-CREATION 
  
samza-ds/src/main/java/org/apache/samza/storage/document/store/metrics/DocumentStoreMetrics.java
 PRE-CREATION 
  settings.gradle 19bff971ad221084dac10d3f7f3facfa42b829a7 

Diff: https://reviews.apache.org/r/36767/diff/


Testing
---


Thanks,

Robert Zuljevic



Re: Samza: can not produce new data to kafka

2015-07-24 Thread Shadi Noghabi
Selina,

You should probably check a few things
1. Your log files to see if you have any errors. Also, does you job fail or
continues running?
2. Does this line   logger.info(key=+key+: message=+message);  write
any logs?
3. This might not be the only reason, but you are sending messages of
type MapString,
String. However, in your config file, you defined 
systems.kafka.samza.msg.serde=string which expects the message to be a
String.


Shadi


On Thu, Jul 23, 2015 at 6:33 PM, Job-Selina Wu swucaree...@gmail.com
wrote:

 Hi,  All

  I am trying to write my first StreamTask class. I have a topic at
 Kafka called http-demo. I like to read the topic and write it to another
 topic called demo-duplicate

 Howeven there is not topic written to Kafka.

 My properties file and StreamTask are below.  Can anyone told me what
 is the bug?
 BTW, if I set checkpoint or Metrics at properties file. the topic of
 checkpoint and metrics could be written to Kafka.  And the content of
  input topic -- http-demo could be show correctly.

 Your help is highly appreciated.

 Sincerely,
 Selina


 - - -- - - - - -
 # Job
 job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
 job.name=demo-parser

 # YARN

 yarn.package.path=file:///Users/selina/IdeaProjects/samza-Demo/target/hello-samza-0.9.1-dist.tar.gz

 # Task
 task.class=samza.http.demo.task.HttpDemoParserStreamTask
 task.inputs=kafka.http-demo

 # Serializers

 serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory

 # Kafka System

 systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
 systems.kafka.samza.msg.serde=string
 systems.kafka.samza.key.serde=string
 systems.kafka.consumer.zookeeper.connect=localhost:2181/
 systems.kafka.consumer.auto.offset.reset=largest
 systems.kafka.producer.bootstrap.servers=localhost:9092
 - - -- - - - - -

 My StreamTask class is simple also

 -

 /**
  *
  * Read data from http-demo topic and write it back to demo-duplicate
  */
 public class HttpDemoParserStreamTask implements StreamTask {

 private static final SystemStream OUTPUT_STREAM = new
 SystemStream(kafka, demo-duplicate);
 Logger logger =
 LoggerFactory.getLogger(HttpDemoParserStreamTask.class);

 @SuppressWarnings(unchecked)
 @Override
 public void process(IncomingMessageEnvelope envelope, MessageCollector
 collector, TaskCoordinator coordinator) throws Exception {

 String key = (String) envelope.getKey();
 String message = envelope.getMessage().toString();
 logger.info(key=+key+: message=+message);

 MapString, String outgoingMap = (MapString, String)
 (envelope.getMessage());
 collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM,
 outgoingMap));
 //collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM,
 message));
 }

 }

 ---



Re: kafka producer failed

2015-07-24 Thread Job-Selina Wu
Hi, Yi:

  I am wondering if the problem can be fixed by the parameter  
max.message.size at kafka.producer.ProducerConfig for the topic size?

  My Http Server send message to Kafka. The last message shown on
console is
message=timestamp=06-20-2015 id=678 ip=22.231.113.68 browser=Safari
postalCode=95066 url=http://sample2.com language=ENG mobileBrand=Apple
count=4269

However the Kafka got Exception from message 4244th
The error is below and Kafka do not accept any new message after this.

[2015-07-24 12:46:11,078] WARN
[console-consumer-61156_Selinas-MacBook-Pro.local-1437766693294-a68fc532-leader-finder-thread],
Failed to find leader for Set([http-demo,0])
(kafka.consumer.ConsumerFetcherManager$LeaderFinderThread)
kafka.common.KafkaException: fetching topic metadata for topics
[Set(http-demo)] from broker [ArrayBuffer(id:0,host:10.1.10.173,port:9092)]
failed
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93)
at
kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
Caused by: java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
at
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
... 3 more
[2015-07-24 12:46:11,287] WARN Fetching topic metadata with correlation id
21 for topics [Set(http-demo)] from broker
[id:0,host:10.1.10.173,port:9092] failed (kafka.client.ClientUtils$)
java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
at
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93)
at
kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)


After the Error:
I show the topic, it is right, but can not show the content by command line

Selinas-MacBook-Pro:samza-Demo selina$ deploy/kafka/bin/kafka-topics.sh
--list --zookeeper localhost:2181
http-demo
Selinas-MacBook-Pro:samza-Demo selina$
deploy/kafka/bin/kafka-console-consumer.sh
--zookeeper localhost:2181 --from-beginning --topic http-demo
[2015-07-24 12:47:38,730] WARN
[console-consumer-10297_Selinas-MacBook-Pro.local-1437767258570-1a809d87],
no brokers found when trying to rebalance.
(kafka.consumer.ZookeeperConsumerConnector)

Attached is my Kafka properties  for server and producer.

Your help is highly appreciated


Sincerely,
Selina



On Thu, Jul 23, 2015 at 11:16 PM, Yi Pan nickpa...@gmail.com wrote:

 Hi, Selina,

 Your question is not clear.
 {quote}
 When the messages was send to Kafka by KafkaProducer, It always failed
 when the message more than 3000 - 4000 messages.
 {quote}

 What's failing? The error stack shows errors on the consumer side and you
 were referring to failures to produce to Kafka. Could you be more specific
 regarding to what's your failure scenario?

 -Yi

 On Thu, Jul 23, 2015 at 5:46 PM, Job-Selina Wu swucaree...@gmail.com
 wrote:

  Hi,
 
  When the messages was send to Kafka by KafkaProducer, It always
 failed
  when the message more than 3000 - 4000 messages. The error is shown
 below.
  I am wondering if any topic size I need to set at Samza configuration?
 
 
  [2015-07-23 17:30:03,792] WARN
 
 
 [console-consumer-84579_Selinas-MacBook-Pro.local-1437697324624-eecb4f40-leader-finder-thread],
  Failed to find leader for Set([http-demo,0])
  (kafka.consumer.ConsumerFetcherManager$LeaderFinderThread)
  kafka.common.KafkaException: fetching topic metadata for topics
  [Set(http-demo)] from broker [ArrayBuffer()] failed
  at
  kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72)
  at
  kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93)
  at
 
 
 kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
  at
 kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
  ^CConsumed 4327 messages
 
  Your reply and comment will be highly appreciated.
 
 
  Sincerely,
  Selina
 



Re: kafka producer failed

2015-07-24 Thread Job-Selina Wu
Hi, All:

 Do you think it could be caused by memory, virtual memory size?

Sincerely,
Selina



On Fri, Jul 24, 2015 at 1:54 PM, Job-Selina Wu swucaree...@gmail.com
wrote:

 Hi, Navina:

 Thanks for your reply: the files are listed below:

 Your help is highly appreciated.

 Sincerely,
 Selina

 The producer.properties for
 Kafka:


 # Producer Basics #

 # list of brokers used for bootstrapping knowledge about the rest of the
 cluster
 # format: host1:port1,host2:port2 ...
 metadata.broker.list=localhost:9092

 # name of the partitioner class for partitioning events; default partition
 spreads data randomly
 #partitioner.class=

 # specifies whether the messages are sent asynchronously (async) or
 synchronously (sync)
 producer.type=sync

 # specify the compression codec for all data generated: none, gzip,
 snappy, lz4.
 # the old config values work as well: 0, 1, 2, 3 for none, gzip, snappy,
 lz4, respectively
 compression.codec=none

 # message encoder
 serializer.class=kafka.serializer.DefaultEncoder

 # allow topic level compression
 #compressed.topics=

 # Async Producer #
 # maximum time, in milliseconds, for buffering data on the producer queue
 #queue.buffering.max.ms=

 # the maximum size of the blocking queue for buffering on the producer
 #queue.buffering.max.messages=

 # Timeout for event enqueue:
 # 0: events will be enqueued immediately or dropped if the queue is full
 # -ve: enqueue will block indefinitely if the queue is full
 # +ve: enqueue will block up to this many milliseconds if the queue is full
 #queue.enqueue.timeout.ms=

 # the number of messages batched at the producer
 #batch.num.messages=


 -the server.properties for
 Kafka--
 # Server Basics #

 # The id of the broker. This must be set to a unique integer for each
 broker.
 broker.id=0

 # Socket Server Settings
 #

 # The port the socket server listens on
 port=9092

 # Hostname the broker will bind to. If not set, the server will bind to
 all interfaces
 #host.name=localhost

 # Hostname the broker will advertise to producers and consumers. If not
 set, it uses the
 # value for host.name if configured.  Otherwise, it will use the value
 returned from
 # java.net.InetAddress.getCanonicalHostName().
 #advertised.host.name=hostname routable by clients

 # The port to publish to ZooKeeper for clients to use. If this is not set,
 # it will publish the same port that the broker binds to.
 #advertised.port=port accessible by clients

 # The number of threads handling network requests
 num.network.threads=3

 # The number of threads doing disk I/O
 num.io.threads=8

 # The send buffer (SO_SNDBUF) used by the socket server
 socket.send.buffer.bytes=102400

 # The receive buffer (SO_RCVBUF) used by the socket server
 socket.receive.buffer.bytes=102400

 # The maximum size of a request that the socket server will accept
 (protection against OOM)
 socket.request.max.bytes=104857600


 # Log Basics #

 # A comma seperated list of directories under which to store log files
 log.dirs=/tmp/kafka-logs

 # The default number of log partitions per topic. More partitions allow
 greater
 # parallelism for consumption, but this will also result in more files
 across
 # the brokers.
 num.partitions=1

 # The number of threads per data directory to be used for log recovery at
 startup and flushing at shutdown.
 # This value is recommended to be increased for installations with data
 dirs located in RAID array.
 num.recovery.threads.per.data.dir=1

 # Log Flush Policy
 #

 # Messages are immediately written to the filesystem but by default we
 only fsync() to sync
 # the OS cache lazily. The following configurations control the flush of
 data to disk.
 # There are a few important trade-offs here:
 #1. Durability: Unflushed data may be lost if you are not using
 replication.
 #2. Latency: Very large flush intervals may lead to latency spikes
 when the flush does occur as there will be a lot of data to flush.
 #3. Throughput: The flush is generally the most expensive operation,
 and a small flush interval may lead to exceessive seeks.
 # The settings below allow one to configure the flush policy to flush data
 after a period of time or
 # every N messages (or both). This can be done globally and overridden on
 a per-topic basis.

 # The number of messages to accept before forcing a flush of data to disk
 #log.flush.interval.messages=1

 # The maximum amount of time a message can sit in a log before we force a
 flush
 #log.flush.interval.ms=1000

 # 

Re: Can I get an example of using the ElasticSearch producer?

2015-07-24 Thread Job-Selina Wu
Dear All:

I like to have an example of using the ElasticSearch producer also.


Thanks
Selina
swucaree...@gmail.com




On Fri, Jul 24, 2015 at 1:03 PM, Woessner, Leo leo.woess...@pearson.com
wrote:

 Can I get an example of using the ElasticSearch producer?

 leo.woess...@pearson.com



Re: Can I get an example of using the ElasticSearch producer?

2015-07-24 Thread Yan Fang
Hi guys,

Thank you for being interested in this new producer. The producer is only
in the master branch, so if you are using the 0.9.1 version, you wont get
this support.

If by any chance you are using the latest version,

1) here are some configuration
http://samza.apache.org/learn/documentation/latest/jobs/configuration-table.html
, see the Using Elasticsearch for output streams part.

2) check this patch https://issues.apache.org/jira/browse/SAMZA-740 as
well. We haven't merged to hello-samza, but it will give you some idea how
to implement. :)

Thanks,

Fang, Yan
yanfang...@gmail.com

On Fri, Jul 24, 2015 at 1:20 PM, Job-Selina Wu swucaree...@gmail.com
wrote:

 Dear All:

 I like to have an example of using the ElasticSearch producer also.


 Thanks
 Selina
 swucaree...@gmail.com




 On Fri, Jul 24, 2015 at 1:03 PM, Woessner, Leo leo.woess...@pearson.com
 wrote:

  Can I get an example of using the ElasticSearch producer?
 
  leo.woess...@pearson.com
 



Re: kafka producer failed

2015-07-24 Thread Job-Selina Wu
Hi, Navina:

Thanks for your reply: the files are listed below:

Your help is highly appreciated.

Sincerely,
Selina

The producer.properties for
Kafka:


# Producer Basics #

# list of brokers used for bootstrapping knowledge about the rest of the
cluster
# format: host1:port1,host2:port2 ...
metadata.broker.list=localhost:9092

# name of the partitioner class for partitioning events; default partition
spreads data randomly
#partitioner.class=

# specifies whether the messages are sent asynchronously (async) or
synchronously (sync)
producer.type=sync

# specify the compression codec for all data generated: none, gzip, snappy,
lz4.
# the old config values work as well: 0, 1, 2, 3 for none, gzip, snappy,
lz4, respectively
compression.codec=none

# message encoder
serializer.class=kafka.serializer.DefaultEncoder

# allow topic level compression
#compressed.topics=

# Async Producer #
# maximum time, in milliseconds, for buffering data on the producer queue
#queue.buffering.max.ms=

# the maximum size of the blocking queue for buffering on the producer
#queue.buffering.max.messages=

# Timeout for event enqueue:
# 0: events will be enqueued immediately or dropped if the queue is full
# -ve: enqueue will block indefinitely if the queue is full
# +ve: enqueue will block up to this many milliseconds if the queue is full
#queue.enqueue.timeout.ms=

# the number of messages batched at the producer
#batch.num.messages=


-the server.properties for
Kafka--
# Server Basics #

# The id of the broker. This must be set to a unique integer for each
broker.
broker.id=0

# Socket Server Settings
#

# The port the socket server listens on
port=9092

# Hostname the broker will bind to. If not set, the server will bind to all
interfaces
#host.name=localhost

# Hostname the broker will advertise to producers and consumers. If not
set, it uses the
# value for host.name if configured.  Otherwise, it will use the value
returned from
# java.net.InetAddress.getCanonicalHostName().
#advertised.host.name=hostname routable by clients

# The port to publish to ZooKeeper for clients to use. If this is not set,
# it will publish the same port that the broker binds to.
#advertised.port=port accessible by clients

# The number of threads handling network requests
num.network.threads=3

# The number of threads doing disk I/O
num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400

# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400

# The maximum size of a request that the socket server will accept
(protection against OOM)
socket.request.max.bytes=104857600


# Log Basics #

# A comma seperated list of directories under which to store log files
log.dirs=/tmp/kafka-logs

# The default number of log partitions per topic. More partitions allow
greater
# parallelism for consumption, but this will also result in more files
across
# the brokers.
num.partitions=1

# The number of threads per data directory to be used for log recovery at
startup and flushing at shutdown.
# This value is recommended to be increased for installations with data
dirs located in RAID array.
num.recovery.threads.per.data.dir=1

# Log Flush Policy #

# Messages are immediately written to the filesystem but by default we only
fsync() to sync
# the OS cache lazily. The following configurations control the flush of
data to disk.
# There are a few important trade-offs here:
#1. Durability: Unflushed data may be lost if you are not using
replication.
#2. Latency: Very large flush intervals may lead to latency spikes when
the flush does occur as there will be a lot of data to flush.
#3. Throughput: The flush is generally the most expensive operation,
and a small flush interval may lead to exceessive seeks.
# The settings below allow one to configure the flush policy to flush data
after a period of time or
# every N messages (or both). This can be done globally and overridden on a
per-topic basis.

# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=1

# The maximum amount of time a message can sit in a log before we force a
flush
#log.flush.interval.ms=1000

# Log Retention Policy
#

# The following configurations control the disposal of log segments. The
policy can
# be set to delete segments after a period of time, or after a given size
has accumulated.
# A segment will be deleted whenever *either* of these 

Re: Samza: can not produce new data to kafka

2015-07-24 Thread Yan Fang
{quote}
 I did not set auto.create.topics.enable anywhere
{quote}

Fine. Then its default to true. No worries.

{quote}
My job is listed as below. However I am wondering how can I know if my
method public void* process*(IncomingMessageEnvelope envelope,
MessageCollector collector, TaskCoordinator coordinator) was run or not.
{quote}

If you have log enabled (from the code, you did), you can check the
contain's log to see if it has the output. Assuming you are using the local
yarn like what hello-samza provides, you should be able to check the logs
in deploy/yarn/userlogs/application_Id.

If you use print.out method, you can see the result in the
deploy/yarn/userlogs/application_Id 's sysout file (if the StreamTask)
works.

If it does not work, you can check the logs in
deploy/yarn/userlogs/application_Id as well to see the exceptions if there
is any.

Thanks,

Fang, Yan
yanfang...@gmail.com

On Fri, Jul 24, 2015 at 1:45 PM, Job-Selina Wu swucaree...@gmail.com
wrote:

 Hi, Yan and Shadi:

 I made a mistake.  Actually, there is no log at /tmp/kafka-logs
 created by   logger.info(key=+key+: message=+message); .  The log I
 provided actually is log for input topic http-demo at
 /tmp/kafka-logs/http-demo-0

 My job is listed as below. However I am wondering how can I know if
 my method public void* process*(IncomingMessageEnvelope envelope,
 MessageCollector collector, TaskCoordinator coordinator) was run or not.

 I manually create topic demo-duplicate by command line, otherwise
 it will be created by samza code.

 I checked I did not set auto.create.topics.enable anywhere. Attached
 is my properties file for Kafka


Your help is highly appreciated

 Sincerely,
 Selina

 [image: Inline image 1]




 On Fri, Jul 24, 2015 at 11:56 AM, Yan Fang yanfang...@gmail.com wrote:

 The code and the property seem good to me. collector.send(new
 OutgoingMessageEnvelope(OUTPUT_STREAM, message));should work. So I am
 curious if you accidentally disabled auto.create.topics.enable  ...Can you
 also try to send msgs from cmd line to demo-duplicate to see if it gets
 anything.

 Let me know if it works.

 Thanks,

 Fang, Yan
 yanfang...@gmail.com

 On Fri, Jul 24, 2015 at 11:48 AM, Job-Selina Wu swucaree...@gmail.com
 wrote:

  Hi, Shadi:
 
Thans a lot for your reply.
  1. There is no error log at Kafka and Samza
 
  2.  this line   logger.info(key=+key+: message=+message);  write
  log correctly as below:
 
  [image: Inline image 1]
 
  This are my last two message with right count
 
  3. I tried both way below, none of them create topic, but I will try it
  again.
 
  collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, outgoingMap));
 
  //collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, message));
 
  4. I wrote a topic call http-demo to Kafka as my input, and the
 content
  can be show with command line below, so the Kafka should be OK.
  deploy/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181
  --from-beginning --topic http-demo
 
  Your help is highly appreciated.
 
  Sincerely,
  Selina
 
 
 
 
  On Fri, Jul 24, 2015 at 9:29 AM, Shadi Noghabi 
  snogh...@linkedin.com.invalid wrote:
 
  Selina,
 
  You should probably check a few things
  1. Your log files to see if you have any errors. Also, does you job
 fail
  or
  continues running?
  2. Does this line   logger.info(key=+key+: message=+message); 
  write
  any logs?
  3. This might not be the only reason, but you are sending messages of
  type MapString,
  String. However, in your config file, you defined 
  systems.kafka.samza.msg.serde=string which expects the message to be a
  String.
 
 
  Shadi
 
 
  On Thu, Jul 23, 2015 at 6:33 PM, Job-Selina Wu swucaree...@gmail.com
  wrote:
 
   Hi,  All
  
I am trying to write my first StreamTask class. I have a topic
 at
   Kafka called http-demo. I like to read the topic and write it to
  another
   topic called demo-duplicate
  
   Howeven there is not topic written to Kafka.
  
   My properties file and StreamTask are below.  Can anyone told me
  what
   is the bug?
   BTW, if I set checkpoint or Metrics at properties file. the
 topic of
   checkpoint and metrics could be written to Kafka.  And the content of
input topic -- http-demo could be show correctly.
  
   Your help is highly appreciated.
  
   Sincerely,
   Selina
  
  
   - - -- - - - - -
   # Job
   job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
   job.name=demo-parser
 
  
   # YARN
  
  
 
 yarn.package.path=file:///Users/selina/IdeaProjects/samza-Demo/target/hello-samza-0.9.1-dist.tar.gz
  
   # Task
   task.class=samza.http.demo.task.HttpDemoParserStreamTask
   task.inputs=kafka.http-demo
  
   # Serializers
  
  
 
 serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
  
   # Kafka System
  
  
 
 systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
   systems.kafka.samza.msg.serde=string
   

Re: Samza: can not produce new data to kafka

2015-07-24 Thread Job-Selina Wu
Hi, Yan:

Thanks for reply my email in detail.  All the files at Yarn logs
shown below. No Exception under samza-Demo/deploy/yarn/logs.  I guess the
StreamTask did not called ...


Partial stdout  file
(samza-Demo/deploy/yarn/logs/userlogs/application_1437767867729_0001/container_1437767867729_0001_01_01/stderr)
is pasted below. In short, the log by  logger.info(key=+key+:
message=+message);  was not generated.

/Library/Java/JavaVirtualMachines/jdk1.8.0_45.jdk/Contents/Home/bin/java
-server -Dsamza.container.name=samza-application-master
-Dlog4j.configuration=file:/tmp/hadoop-selina/nm-local-dir/usercache/selina/appcache/application_1437767867729_0001/container_1437767867729_0001_01_01/__package/lib/log4j.xml
-Dsamza.log.dir=/Users/selina/IdeaProjects/samza-Demo/deploy/yarn/logs/userlogs/application_1437767867729_0001/container_1437767867729_0001_01_01
-Djava.io.tmpdir=/tmp/hadoop-selina/nm-local-dir/usercache/selina/appcache/application_1437767867729_0001/container_1437767867729_0001_01_01/__package/tmp
-Xmx768M -XX:+PrintGCDateStamps
-Xloggc:/Users/selina/IdeaProjects/samza-Demo/deploy/yarn/logs/userlogs/application_1437767867729_0001/container_1437767867729_0001_01_01/gc.log
-XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10
-XX:GCLogFileSize=10241024 -d64 -cp
/Users/selina/IdeaProjects/samza-Demo/deploy/yarn/etc/hadoop:/tmp/hadoop-selina/nm-local-dir/usercache/selina/appcache/application_1437767867729_0001/container_1437767867729_0001_01_01/__package/lib/activation-1.1.jar:/tmp/hadoop-selina/nm-local-dir/usercache/selina/appcache/application_1437767867729_0001/container_1437767867729_0001_01_01/__package/lib/akka-actor_2.10-2.1.2.jar:/tmp/hadoop-selina/nm-local-dir/usercache/selina/appcache/application_1437767867729_0001/container_1437767867729_0001_01_01/__package/lib/aopalliance-1.0.jar:/tmp/hadoop-selina/nm-local-dir/usercache/selina/appcache/application_1437767867729_0001/container_1437767867729_0001_01_01/__package/lib/asm-3.1.jar:/tmp/hadoop-selina/nm-local-dir/usercache/selina/appcache/application_1437767867729_0001/container_1437767867729_0001_01_01/__package/lib/avro-1.7.4.jar:/tmp/hadoop-selina/nm-local-dir/usercache/selina/appcache/application_1437767867729_0001/container_1437767867729_0001_01_01/__package/lib/commons-beanutils-1.7.0.jar:/tmp/hadoop-selina/nm-local-dir/usercache/selina/appcache/application_1437767867729_0001/container_1437767867729_0001_01_01/__package/lib/commons-beanutils-core-1.8.0.jar:/tmp/hadoop-selina/nm-local-dir/usercache/selina/appcache/application_1437767867729_0001/container_1437767867729_0001_01_01/__package/lib/commons-cli-1.2


For file gc.log.0.current shown Allocation failure and Full GC

CommandLine flags: -XX:GCLogFileSize=10241024 -XX:InitialHeapSize=268435456
-XX:MaxHeapSize=805306368 -XX:NumberOfGCLogFiles=10 -XX:+PrintGC
-XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps
-XX:+UseCompressedClassPointers -XX:+UseCompressedOops
-XX:+UseGCLogFileRotation -XX:+UseParallelGC

2015-07-24T13:28:56.901+0800: 0.694:* [GC (Allocation Failure)*
65536K-8449K(251392K), 0.0062314 secs]
2015-07-24T13:28:57.188+0800: 0.981: [GC (System.gc())
39240K-6305K(251392K), 0.0047744 secs]
2015-07-24T13:28:57.193+0800: 0.986: [Full GC (System.gc())
6305K-5940K(251392K), 0.0147206 secs]
2015-07-24T13:28:57.625+0800: 1.418: [GC (Allocation Failure)
71476K-12511K(251392K), 0.0030179 secs]
2015-07-24T13:28:59.889+0800: 3.682: [GC (Allocation Failure)
78047K-13859K(251392K), 0.0052610 secs]
2015-07-24T13:29:15.487+0800: 19.280: [GC (Metadata GC Threshold)
35659K-10106K(251392K), 0.0036350 secs]
2015-07-24T13:29:15.490+0800: 19.284: *[Full GC (Metadata GC
Threshold*)  10106K-7318K(149504K), 0.0200118 secs]



[image: Inline image 1]

   Your help is highly appreciated.

Sincerely,
Selina

On Fri, Jul 24, 2015 at 1:51 PM, Yan Fang yanfang...@gmail.com wrote:

 {quote}
  I did not set auto.create.topics.enable anywhere
 {quote}

 Fine. Then its default to true. No worries.

 {quote}
 My job is listed as below. However I am wondering how can I know if my
 method public void* process*(IncomingMessageEnvelope envelope,
 MessageCollector collector, TaskCoordinator coordinator) was run or not.
 {quote}

 If you have log enabled (from the code, you did), you can check the
 contain's log to see if it has the output. Assuming you are using the local
 yarn like what hello-samza provides, you should be able to check the logs
 in deploy/yarn/userlogs/application_Id.

 If you use print.out method, you can see the result in the
 deploy/yarn/userlogs/application_Id 's sysout file (if the StreamTask)
 works.

 If it does not work, you can check the logs in
 deploy/yarn/userlogs/application_Id as well to see the exceptions if there
 is any.

 Thanks,

 Fang, Yan
 yanfang...@gmail.com

 On Fri, Jul 24, 2015 at 1:45 PM, Job-Selina Wu swucaree...@gmail.com
 wrote:

 Hi, Yan and Shadi:

 I made a mistake.  Actually, there is no log at 

Re: Review Request 36768: SAMZA-740: Add ElasticsearchProducer example to samza-hello-samza

2015-07-24 Thread Navina Ramesh

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36768/#review92951
---



bin/grid-elastic (line 1)
https://reviews.apache.org/r/36768/#comment147209

Can you make changes to bin/grid by adding options to install 
elastic_search and/or kibana? Code seems redundant.



pom.xml (line 116)
https://reviews.apache.org/r/36768/#comment147210

You won't need this if you apply your patch to the latest branch in 
samza-hello-samza.


I believe ElasticsearchProducer is going to be part of the 0.10 release in 
Samza. So, this patch should be against the latest branch in hello-samza. 
This diff looks like it is against the master branch. Can you please make the 
changes against latest branch ?

- Navina Ramesh


On July 24, 2015, 8:31 a.m., Stuart Davidson wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/36768/
 ---
 
 (Updated July 24, 2015, 8:31 a.m.)
 
 
 Review request for samza.
 
 
 Repository: samza-hello-samza
 
 
 Description
 ---
 
 I'd put together an example of how to use the new ElasticsearchProvider for 
 my work colleagues and I thought it'd be worth submitting it back to the 
 community. This also includes a script to start elasticsearch and kibana on 
 the host - be aware, there's a 64bit version depending on what machine you 
 want to run against.
 
 Also note, this is against 0.10.0 of Samza which is not released yet. I am 
 making the assumption that the Elasticsearch jars are bundled as part of that 
 - if not, we'll need to add them to the POM here.
 
 
 Diffs
 -
 
   bin/grid-elastic PRE-CREATION 
   pom.xml f9c4fa9 
   src/main/assembly/src.xml f57fee2 
   src/main/config/wikipedia-elastic.properties PRE-CREATION 
   src/main/java/samza/examples/wikipedia/task/WikipediaElasticStreamTask.java 
 PRE-CREATION 
 
 Diff: https://reviews.apache.org/r/36768/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Stuart Davidson
 




Re: Review Request 36770: SAMZA-742 Add a Contribution section in README of samza and hello-samza

2015-07-24 Thread Navina Ramesh

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36770/#review92936
---

Ship it!


Ship It!

- Navina Ramesh


On July 24, 2015, 8:57 a.m., Aleksandar Pejakovic wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/36770/
 ---
 
 (Updated July 24, 2015, 8:57 a.m.)
 
 
 Review request for samza.
 
 
 Repository: samza-hello-samza
 
 
 Description
 ---
 
 Hello Samza: added Contribution section to README
 
 
 Diffs
 -
 
   README.md 14bf45e 
 
 Diff: https://reviews.apache.org/r/36770/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Aleksandar Pejakovic
 




Re: Review Request 36545: SAMZA-682 Refactor Coordinator stream messages

2015-07-24 Thread Navina Ramesh

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36545/#review92938
---


Thanks for picking this up! It feels good to look at a refactored code. 

One suggestion: Please run all the intergration test (including the zopkio 
tests) before checking in this patch. I don't think we cleanly start and stop 
coordinator stream producers/consumers in all the managers. Please verify that 
nothing is broken due to this change.


samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala (line 
627)
https://reviews.apache.org/r/36545/#comment147198

LocalityManager maintains container to host-level mapping. Not a task to 
host mapping. Please change this back to containerId.


- Navina Ramesh


On July 24, 2015, 12:27 p.m., József Márton Jung wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/36545/
 ---
 
 (Updated July 24, 2015, 12:27 p.m.)
 
 
 Review request for samza.
 
 
 Repository: samza
 
 
 Description
 ---
 
 The following has been refactored: 
 1. Static inner classes from CoordinatorStreamMessage has been extracted
 2. Common functionality from CheckpointManager, ChangelogMappingManager and 
 LocalityManager has benn moved to a base class
 
 
 Diffs
 -
 
   checkstyle/import-control.xml 6654319 
   samza-core/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java 
 7445996 
   samza-core/src/main/java/org/apache/samza/container/LocalityManager.java 
 55c258f 
   
 samza-core/src/main/java/org/apache/samza/coordinator/stream/AbstractCoordinatorStreamManager.java
  PRE-CREATION 
   
 samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java
  e5ab4fb 
   
 samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
  b1078bd 
   
 samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemProducer.java
  92f8907 
   
 samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamWriter.java
  f769756 
   
 samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/CoordinatorStreamMessage.java
  PRE-CREATION 
   
 samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/Delete.java
  PRE-CREATION 
   
 samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetChangelogMapping.java
  PRE-CREATION 
   
 samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetCheckpoint.java
  PRE-CREATION 
   
 samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetConfig.java
  PRE-CREATION 
   
 samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetContainerHostMapping.java
  PRE-CREATION 
   samza-core/src/main/java/org/apache/samza/job/model/JobModel.java ad6387d 
   
 samza-core/src/main/java/org/apache/samza/storage/ChangelogPartitionManager.java
  7d3409c 
   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
 27b2517 
   samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 
 f621611 
   samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala 1c178a6 
   
 samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java
  e454593 
   
 samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamMessage.java
  ac26a01 
   
 samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java
  c25f6a7 
   
 samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemProducer.java
  1ef07d0 
   
 samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamWriter.java
  c484660 
   
 samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala 
 84fdeaa 
   samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml 41303f7 
 
 Diff: https://reviews.apache.org/r/36545/diff/
 
 
 Testing
 ---
 
 Tests has been updated.
 
 
 Thanks,
 
 József Márton Jung
 




Re: Samza: can not produce new data to kafka

2015-07-24 Thread Yan Fang
The code and the property seem good to me. collector.send(new
OutgoingMessageEnvelope(OUTPUT_STREAM, message));should work. So I am
curious if you accidentally disabled auto.create.topics.enable  ...Can you
also try to send msgs from cmd line to demo-duplicate to see if it gets
anything.

Let me know if it works.

Thanks,

Fang, Yan
yanfang...@gmail.com

On Fri, Jul 24, 2015 at 11:48 AM, Job-Selina Wu swucaree...@gmail.com
wrote:

 Hi, Shadi:

   Thans a lot for your reply.
 1. There is no error log at Kafka and Samza

 2.  this line   logger.info(key=+key+: message=+message);  write
 log correctly as below:

 [image: Inline image 1]

 This are my last two message with right count

 3. I tried both way below, none of them create topic, but I will try it
 again.

 collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, outgoingMap));

 //collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, message));

 4. I wrote a topic call http-demo to Kafka as my input, and the content
 can be show with command line below, so the Kafka should be OK.
 deploy/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181
 --from-beginning --topic http-demo

 Your help is highly appreciated.

 Sincerely,
 Selina




 On Fri, Jul 24, 2015 at 9:29 AM, Shadi Noghabi 
 snogh...@linkedin.com.invalid wrote:

 Selina,

 You should probably check a few things
 1. Your log files to see if you have any errors. Also, does you job fail
 or
 continues running?
 2. Does this line   logger.info(key=+key+: message=+message); 
 write
 any logs?
 3. This might not be the only reason, but you are sending messages of
 type MapString,
 String. However, in your config file, you defined 
 systems.kafka.samza.msg.serde=string which expects the message to be a
 String.


 Shadi


 On Thu, Jul 23, 2015 at 6:33 PM, Job-Selina Wu swucaree...@gmail.com
 wrote:

  Hi,  All
 
   I am trying to write my first StreamTask class. I have a topic at
  Kafka called http-demo. I like to read the topic and write it to
 another
  topic called demo-duplicate
 
  Howeven there is not topic written to Kafka.
 
  My properties file and StreamTask are below.  Can anyone told me
 what
  is the bug?
  BTW, if I set checkpoint or Metrics at properties file. the topic of
  checkpoint and metrics could be written to Kafka.  And the content of
   input topic -- http-demo could be show correctly.
 
  Your help is highly appreciated.
 
  Sincerely,
  Selina
 
 
  - - -- - - - - -
  # Job
  job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
  job.name=demo-parser

 
  # YARN
 
 
 yarn.package.path=file:///Users/selina/IdeaProjects/samza-Demo/target/hello-samza-0.9.1-dist.tar.gz
 
  # Task
  task.class=samza.http.demo.task.HttpDemoParserStreamTask
  task.inputs=kafka.http-demo
 
  # Serializers
 
 
 serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
 
  # Kafka System
 
 
 systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
  systems.kafka.samza.msg.serde=string
  systems.kafka.samza.key.serde=string
  systems.kafka.consumer.zookeeper.connect=localhost:2181/
  systems.kafka.consumer.auto.offset.reset=largest
  systems.kafka.producer.bootstrap.servers=localhost:9092
  - - -- - - - - -
 
  My StreamTask class is simple also
 
  -
 
  /**
   *
   * Read data from http-demo topic and write it back to demo-duplicate
   */
  public class HttpDemoParserStreamTask implements StreamTask {
 
  private static final SystemStream OUTPUT_STREAM = new
  SystemStream(kafka, demo-duplicate);
  Logger logger =
  LoggerFactory.getLogger(HttpDemoParserStreamTask.class);
 
  @SuppressWarnings(unchecked)
  @Override
  public void process(IncomingMessageEnvelope envelope,
 MessageCollector
  collector, TaskCoordinator coordinator) throws Exception {
 
  String key = (String) envelope.getKey();
  String message = envelope.getMessage().toString();
  logger.info(key=+key+: message=+message);
 
  MapString, String outgoingMap = (MapString, String)
  (envelope.getMessage());
  collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM,
  outgoingMap));
  //collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM,
  message));
  }
 
  }
 
  ---
 





Re: Samza: can not produce new data to kafka

2015-07-24 Thread Job-Selina Wu
Hi, Shadi:

  Thans a lot for your reply.
1. There is no error log at Kafka and Samza

2.  this line   logger.info(key=+key+: message=+message);  write log
correctly as below:

[image: Inline image 1]

This are my last two message with right count

3. I tried both way below, none of them create topic, but I will try it
again.

collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, outgoingMap));

//collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, message));

4. I wrote a topic call http-demo to Kafka as my input, and the content
can be show with command line below, so the Kafka should be OK.
deploy/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181
--from-beginning --topic http-demo

Your help is highly appreciated.

Sincerely,
Selina




On Fri, Jul 24, 2015 at 9:29 AM, Shadi Noghabi 
snogh...@linkedin.com.invalid wrote:

 Selina,

 You should probably check a few things
 1. Your log files to see if you have any errors. Also, does you job fail or
 continues running?
 2. Does this line   logger.info(key=+key+: message=+message);  write
 any logs?
 3. This might not be the only reason, but you are sending messages of
 type MapString,
 String. However, in your config file, you defined 
 systems.kafka.samza.msg.serde=string which expects the message to be a
 String.


 Shadi


 On Thu, Jul 23, 2015 at 6:33 PM, Job-Selina Wu swucaree...@gmail.com
 wrote:

  Hi,  All
 
   I am trying to write my first StreamTask class. I have a topic at
  Kafka called http-demo. I like to read the topic and write it to
 another
  topic called demo-duplicate
 
  Howeven there is not topic written to Kafka.
 
  My properties file and StreamTask are below.  Can anyone told me what
  is the bug?
  BTW, if I set checkpoint or Metrics at properties file. the topic of
  checkpoint and metrics could be written to Kafka.  And the content of
   input topic -- http-demo could be show correctly.
 
  Your help is highly appreciated.
 
  Sincerely,
  Selina
 
 
  - - -- - - - - -
  # Job
  job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
  job.name=demo-parser
 
  # YARN
 
 
 yarn.package.path=file:///Users/selina/IdeaProjects/samza-Demo/target/hello-samza-0.9.1-dist.tar.gz
 
  # Task
  task.class=samza.http.demo.task.HttpDemoParserStreamTask
  task.inputs=kafka.http-demo
 
  # Serializers
 
 
 serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
 
  # Kafka System
 
 
 systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
  systems.kafka.samza.msg.serde=string
  systems.kafka.samza.key.serde=string
  systems.kafka.consumer.zookeeper.connect=localhost:2181/
  systems.kafka.consumer.auto.offset.reset=largest
  systems.kafka.producer.bootstrap.servers=localhost:9092
  - - -- - - - - -
 
  My StreamTask class is simple also
 
  -
 
  /**
   *
   * Read data from http-demo topic and write it back to demo-duplicate
   */
  public class HttpDemoParserStreamTask implements StreamTask {
 
  private static final SystemStream OUTPUT_STREAM = new
  SystemStream(kafka, demo-duplicate);
  Logger logger =
  LoggerFactory.getLogger(HttpDemoParserStreamTask.class);
 
  @SuppressWarnings(unchecked)
  @Override
  public void process(IncomingMessageEnvelope envelope,
 MessageCollector
  collector, TaskCoordinator coordinator) throws Exception {
 
  String key = (String) envelope.getKey();
  String message = envelope.getMessage().toString();
  logger.info(key=+key+: message=+message);
 
  MapString, String outgoingMap = (MapString, String)
  (envelope.getMessage());
  collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM,
  outgoingMap));
  //collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM,
  message));
  }
 
  }
 
  ---
 



Can I get an example of using the ElasticSearch producer?

2015-07-24 Thread Woessner, Leo
Can I get an example of using the ElasticSearch producer?

leo.woess...@pearson.com


Re: Security on YARN

2015-07-24 Thread Chen Song
Can someone give some context on this? I can volunteer myself and try
working on this.

Chen

On Thu, Jul 2, 2015 at 4:29 AM, Qi Fu q...@talend.com wrote:

 Hi Yi  Yan,

 Many thanks for your information. I have created a jira for this:
 https://issues.apache.org/jira/browse/SAMZA-727
 I'm willing to test it if someone can work on this.


 -Qi

 
 From: Yi Pan nickpa...@gmail.com
 Sent: Thursday, July 2, 2015 1:38 AM
 To: dev@samza.apache.org
 Subject: Re: Security on YARN

 Hi, Yan,

 Your memory serves as well as mine. :) I remember that Chris and I
 discussed this Kerberos ticket expiration issue when we were brain storming
 on how to access HDFS data in Samza. At high-level, what happens is that
 the Kerberos ticket to access a secured Hadoop cluster is issued to Samza
 containers at the job start time, and will expire later. For a long-running
 Samza job, it does not work. We will need a way to refresh the Kerberos
 ticket periodically, which is not supported yet. Chris probably can chime
 in with more details.

 -Yi

 On Wed, Jul 1, 2015 at 4:08 PM, Yan Fang yanfang...@gmail.com wrote:

  Hi Qi,
 
  I think this is caused by the fact that Samza currently does not support
  Yarn with Kerberos. Feel free to open a ticket for this feature.
 
  But if my memory serves, there was an issue mentioned about the Kerberos.
  Seems when the Kerberos ticket expires, Samza will have some issues? Can
  not find the resource. Anyone remember this?
 
  Cheers,
 
  Fang, Yan
  yanfang...@gmail.com
 
  On Wed, Jul 1, 2015 at 3:41 AM, Qi Fu q...@talend.com wrote:
 
   Hi all,
  
  
   I'm testing Samza on YARN and I have encountered a problem on the
  security
   setting of YARN (Kerberos). Here is the detail:
  
   1. My cluster is secured by Kerberos, and I deploy my samza job from
 one
   of the cluster.
  
  
   2. My config file is in ~/.samza/conf/(yarn-site.xml, core-site.xml,
   hdfs-site.xml)
  
  
   3. The job is deployed successfully, and I can get the info such as:
  
   ClientHelper [INFO] set package url to scheme: hdfs port: -1
 file:
   /user/test/samzatest.tar.gz for application_1435680272316_0003
  
   ClientHelper [INFO] set package size to 212924524 for
   application_1435680272316_0003
  
  
  
   I think the security setting is correct as it can get the file size
   from HDFS.
  
  
   4. But I get the error from YARN job manager as following:
  
  
   Application application_1435680272316_0003 failed 2 times due to AM
   Container for appattempt_1435680272316_0003_02 exited with
 exitCode:
   -1000
  
   For more detailed output, check application tracking page:
   http://cdh-namenode:8088/proxy/application_1435680272316_0003/Then,
  click
   on links to logs of each attempt.
  
   Diagnostics: Failed on local exception: java.io.IOException:
   org.apache.hadoop.security.AccessControlException: Client cannot
   authenticate via:[TOKEN, KERBEROS]; Host Details : local host is:
   talend-cdh-datanode8/62.210.141.237; destination host is:
   talend-cdh-namenode:8020;
  
   java.io.IOException: Failed on local exception: java.io.IOException:
   org.apache.hadoop.security.AccessControlException: Client cannot
   authenticate via:[TOKEN, KERBEROS]; Host Details : local host is:
   cdh-datanode8/62.210.141.237; destination host is:
  cdh-namenode:8020;
  
   at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:772)
  
   ..
  
  
  
   Anyone knows how to solve this?
  
  
   Qi FU
  
 




-- 
Chen Song


Re: Security on YARN

2015-07-24 Thread Yan Fang
Hi Chen Song,

If you can work on this issue, it will be great.

1. the related ticket is https://issues.apache.org/jira/browse/SAMZA-727

2. most of the change will happen in Yarn AM and Yarn client parts. The
code sits in the samza-yarn package
https://github.com/apache/samza/tree/master/samza-yarn/src/main/scala/org/apache/samza/job/yarn
.

3. when you implement this, make sure it does not affect the non-secure
Yarn implementation. Because non-secure cluster implementation has been
proved working, while the secure cluster may have the issue as Yi Pan
mentioned, For a long-running
Samza job, it does not work. We will need a way to refresh the Kerberos ticket
periodically, which is not supported yet.  But I am happy to see at least
we have some support for secure cluster. We can figure the issue out later.

If you want to have some help in understanding the existing code, let me
know.

Thanks,

Fang, Yan
yanfang...@gmail.com

On Fri, Jul 24, 2015 at 7:00 PM, Chen Song chen.song...@gmail.com wrote:

 Can someone give some context on this? I can volunteer myself and try
 working on this.

 Chen

 On Thu, Jul 2, 2015 at 4:29 AM, Qi Fu q...@talend.com wrote:

  Hi Yi  Yan,
 
  Many thanks for your information. I have created a jira for this:
  https://issues.apache.org/jira/browse/SAMZA-727
  I'm willing to test it if someone can work on this.
 
 
  -Qi
 
  
  From: Yi Pan nickpa...@gmail.com
  Sent: Thursday, July 2, 2015 1:38 AM
  To: dev@samza.apache.org
  Subject: Re: Security on YARN
 
  Hi, Yan,
 
  Your memory serves as well as mine. :) I remember that Chris and I
  discussed this Kerberos ticket expiration issue when we were brain
 storming
  on how to access HDFS data in Samza. At high-level, what happens is that
  the Kerberos ticket to access a secured Hadoop cluster is issued to Samza
  containers at the job start time, and will expire later. For a
 long-running
  Samza job, it does not work. We will need a way to refresh the Kerberos
  ticket periodically, which is not supported yet. Chris probably can chime
  in with more details.
 
  -Yi
 
  On Wed, Jul 1, 2015 at 4:08 PM, Yan Fang yanfang...@gmail.com wrote:
 
   Hi Qi,
  
   I think this is caused by the fact that Samza currently does not
 support
   Yarn with Kerberos. Feel free to open a ticket for this feature.
  
   But if my memory serves, there was an issue mentioned about the
 Kerberos.
   Seems when the Kerberos ticket expires, Samza will have some issues?
 Can
   not find the resource. Anyone remember this?
  
   Cheers,
  
   Fang, Yan
   yanfang...@gmail.com
  
   On Wed, Jul 1, 2015 at 3:41 AM, Qi Fu q...@talend.com wrote:
  
Hi all,
   
   
I'm testing Samza on YARN and I have encountered a problem on the
   security
setting of YARN (Kerberos). Here is the detail:
   
1. My cluster is secured by Kerberos, and I deploy my samza job from
  one
of the cluster.
   
   
2. My config file is in ~/.samza/conf/(yarn-site.xml, core-site.xml,
hdfs-site.xml)
   
   
3. The job is deployed successfully, and I can get the info such as:
   
ClientHelper [INFO] set package url to scheme: hdfs port: -1
  file:
/user/test/samzatest.tar.gz for application_1435680272316_0003
   
ClientHelper [INFO] set package size to 212924524 for
application_1435680272316_0003
   
   
   
I think the security setting is correct as it can get the file
 size
from HDFS.
   
   
4. But I get the error from YARN job manager as following:
   
   
Application application_1435680272316_0003 failed 2 times due to
 AM
Container for appattempt_1435680272316_0003_02 exited with
  exitCode:
-1000
   
For more detailed output, check application tracking page:
http://cdh-namenode:8088/proxy/application_1435680272316_0003/Then,
   click
on links to logs of each attempt.
   
Diagnostics: Failed on local exception: java.io.IOException:
org.apache.hadoop.security.AccessControlException: Client cannot
authenticate via:[TOKEN, KERBEROS]; Host Details : local host is:
talend-cdh-datanode8/62.210.141.237; destination host is:
talend-cdh-namenode:8020;
   
java.io.IOException: Failed on local exception: java.io.IOException:
org.apache.hadoop.security.AccessControlException: Client cannot
authenticate via:[TOKEN, KERBEROS]; Host Details : local host is:
cdh-datanode8/62.210.141.237; destination host is:
   cdh-namenode:8020;
   
at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:772)
   
..
   
   
   
Anyone knows how to solve this?
   
   
Qi FU
   
  
 



 --
 Chen Song



Re: Review Request 36163: SAMZA-690: changelog topic creation should not be in the container code

2015-07-24 Thread Yan Fang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36163/#review93019
---



samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 
(line 80)
https://reviews.apache.org/r/36163/#comment147281

it should be config, not coordinatorSystemConfig because we need to update 
the config from the stream.



samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 
(line 101)
https://reviews.apache.org/r/36163/#comment147282

its private because its only used by this class.

Also move this to the end of the class because it is good to put all the 
private methods together.



samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 
(lines 121 - 125)
https://reviews.apache.org/r/36163/#comment147283

this can be simplified a little:

for ((storeName, systemStream) - changeLogSystemStreams) {
  val systemAdmin = config
.getSystemFactory(systemStream.getName)
.getOrElse(throw new SamzaException(A stream uses system %s, which 
is missing from the configuration. format 
systemName)).map(Util.getObj[SystemFactory](_)).getOrElse(systemStream.getSystem,
  throw new SamzaException(Unable to get systemAdmin for store  + 
storeName +  and systemStream + systemStream))
  
  
Then  do not need line 104-109, line 117-119.



samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 
(line 126)
https://reviews.apache.org/r/36163/#comment147284

add logs for the case where the topic is already existied. Log the metadata 
information. (like the original createStream code does)


- Yan Fang


On July 9, 2015, 2:39 p.m., Robert Zuljevic wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/36163/
 ---
 
 (Updated July 9, 2015, 2:39 p.m.)
 
 
 Review request for samza.
 
 
 Repository: samza
 
 
 Description
 ---
 
 Removed trailing whitespaces
 
 
 Diffs
 -
 
   samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java 
 7a588ebc99b5f07d533e48e10061a3075a63665a 
   
 samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java
  249b8ae3a904716ea51a2b27c7701ac30d13b854 
   samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 
 8ee034a642a13af1b0fdb4ebbb3b2592bb8e2be1 
   samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
 aeba61a95371faaba23c97d896321b8d95467f87 
   
 samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala
  097f41062f3432ae9dc9a9737b48ed7b2f709f20 
   
 samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala 
 8d54c4639fc226b34e64915935c1d90e5917af2e 
   
 samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
  d9ae187c7707673fe15c8cb7ea854e02c4a89a54 
   
 samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
  35086f54f526d5d88ad3bc312b71fce40260e7c6 
   samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java 
 b063366f0f60e401765a000fa265c59dee4a461e 
   
 samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
  1e936b42a5b9a4bfb43766c17847b2947ebdb21d 
 
 Diff: https://reviews.apache.org/r/36163/diff/
 
 
 Testing
 ---
 
 I wasn't really sure what kind of test (unit test / integration test) I 
 should make here, so any pointers would be greatly appreaciated! I tested the 
 change with the unit/integration tests already available.
 
 
 Thanks,
 
 Robert Zuljevic
 




Re: Review Request 36545: SAMZA-682 Refactor Coordinator stream messages

2015-07-24 Thread József Márton Jung


 On July 23, 2015, 10:35 p.m., Yan Fang wrote:
  samza-core/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java,
   line 46
  https://reviews.apache.org/r/36545/diff/1/?file=1013337#file1013337line46
 
  To be consistent, lets go with TaskName, not the String.

Corrected.


 On July 23, 2015, 10:35 p.m., Yan Fang wrote:
  samza-core/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java,
   line 74
  https://reviews.apache.org/r/36545/diff/1/?file=1013337#file1013337line74
 
  Any reason that you do not want to use the TaskName class? TaskName 
  seems fine here.

Since the TaskName class consists of only one field called taskName, I tought 
that using only a string is sufficient. Chaned back to TaskName.


 On July 23, 2015, 10:35 p.m., Yan Fang wrote:
  samza-core/src/main/java/org/apache/samza/container/LocalityManager.java, 
  line 64
  https://reviews.apache.org/r/36545/diff/1/?file=1013338#file1013338line64
 
  sourceSuffix is more descriptive.

Corrected.


 On July 23, 2015, 10:35 p.m., Yan Fang wrote:
  samza-core/src/main/java/org/apache/samza/manager/CoordinatorStreamManager.java,
   line 20
  https://reviews.apache.org/r/36545/diff/1/?file=1013349#file1013349line20
 
  I think it makes sense that this class stays in its original package: 
  samza-core/src/main/java/org/apache/samza/coordinator/stream . Because its 
  only about the coordinatorStream, not the overall manager of the samza.

Corrected, moved the class to 
samza-core/src/main/java/org/apache/samza/coordinator/stream package.


 On July 23, 2015, 10:35 p.m., Yan Fang wrote:
  samza-core/src/main/java/org/apache/samza/manager/CoordinatorStreamManager.java,
   line 29
  https://reviews.apache.org/r/36545/diff/1/?file=1013349#file1013349line29
 
  a little more in the doc. This class is not really manages the 
  coordinator stream, it is an abstract class that other stream managers want 
  to extend.
  
  Also, renaming it to AbstractCoordinatorStreamManager maybe helpful too.

Renamed. Also Javadoc updated on the class.


 On July 23, 2015, 10:35 p.m., Yan Fang wrote:
  samza-core/src/main/java/org/apache/samza/manager/CoordinatorStreamManager.java,
   line 65
  https://reviews.apache.org/r/36545/diff/1/?file=1013349#file1013349line65
 
  typo, sends

Corrected.


 On July 23, 2015, 10:35 p.m., Yan Fang wrote:
  samza-core/src/main/java/org/apache/samza/manager/CoordinatorStreamManager.java,
   line 96
  https://reviews.apache.org/r/36545/diff/1/?file=1013349#file1013349line96
 
  no +

Removed.


 On July 23, 2015, 10:35 p.m., Yan Fang wrote:
  samza-core/src/main/java/org/apache/samza/manager/CoordinatorStreamManager.java,
   line 112
  https://reviews.apache.org/r/36545/diff/1/?file=1013349#file1013349line112
 
  I think, taskName maybe more general. In case we have more information 
  in the TaskName, or other rules of registering. Just personal idea.

I agree. Renamed to taskName.


 On July 23, 2015, 10:35 p.m., Yan Fang wrote:
  samza-core/src/main/java/org/apache/samza/storage/ChangelogPartitionManager.java,
   line 74
  https://reviews.apache.org/r/36545/diff/1/?file=1013350#file1013350line74
 
  going with the taskName is fine.

Corrected.


 On July 23, 2015, 10:35 p.m., Yan Fang wrote:
  samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala, 
  line 151
  https://reviews.apache.org/r/36545/diff/1/?file=1013351#file1013351line151
 
  if we use TaskName in the regitser method, do not need to change this 
  one.

Corrected.


 On July 23, 2015, 10:35 p.m., Yan Fang wrote:
  samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala, 
  line 245
  https://reviews.apache.org/r/36545/diff/1/?file=1013352#file1013352line245
 
  same

Corrected.


 On July 23, 2015, 10:35 p.m., Yan Fang wrote:
  samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala,
   line 185
  https://reviews.apache.org/r/36545/diff/1/?file=1013353#file1013353line185
 
  same

Corrected.


- József Márton


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36545/#review92825
---


On July 16, 2015, 1:33 p.m., József Márton Jung wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/36545/
 ---
 
 (Updated July 16, 2015, 1:33 p.m.)
 
 
 Review request for samza.
 
 
 Repository: samza
 
 
 Description
 ---
 
 The following has been refactored: 
 1. Static inner classes from CoordinatorStreamMessage has been extracted
 2. Common functionality from CheckpointManager, ChangelogMappingManager and 
 LocalityManager has benn moved to a base class
 
 
 Diffs
 -
 
   checkstyle/import-control.xml eef3370 
   

Re: Review Request 36545: SAMZA-682 Refactor Coordinator stream messages

2015-07-24 Thread József Márton Jung

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36545/
---

(Updated July 24, 2015, 12:27 p.m.)


Review request for samza.


Changes
---

Corrections based on code review.


Repository: samza


Description
---

The following has been refactored: 
1. Static inner classes from CoordinatorStreamMessage has been extracted
2. Common functionality from CheckpointManager, ChangelogMappingManager and 
LocalityManager has benn moved to a base class


Diffs (updated)
-

  checkstyle/import-control.xml 6654319 
  samza-core/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java 
7445996 
  samza-core/src/main/java/org/apache/samza/container/LocalityManager.java 
55c258f 
  
samza-core/src/main/java/org/apache/samza/coordinator/stream/AbstractCoordinatorStreamManager.java
 PRE-CREATION 
  
samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java
 e5ab4fb 
  
samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
 b1078bd 
  
samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemProducer.java
 92f8907 
  
samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamWriter.java
 f769756 
  
samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/CoordinatorStreamMessage.java
 PRE-CREATION 
  
samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/Delete.java
 PRE-CREATION 
  
samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetChangelogMapping.java
 PRE-CREATION 
  
samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetCheckpoint.java
 PRE-CREATION 
  
samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetConfig.java
 PRE-CREATION 
  
samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetContainerHostMapping.java
 PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/job/model/JobModel.java ad6387d 
  
samza-core/src/main/java/org/apache/samza/storage/ChangelogPartitionManager.java
 7d3409c 
  samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
27b2517 
  samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 
f621611 
  samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala 1c178a6 
  
samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java
 e454593 
  
samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamMessage.java
 ac26a01 
  
samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java
 c25f6a7 
  
samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemProducer.java
 1ef07d0 
  
samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamWriter.java
 c484660 
  samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala 
84fdeaa 
  samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml 41303f7 

Diff: https://reviews.apache.org/r/36545/diff/


Testing
---

Tests has been updated.


Thanks,

József Márton Jung