Hi, Dear All:
I have two Tasks at Samza. HttpDemoParserStreamTask and
HttpDemoStatsStreamTask. They are almost same, except the output topic name
is different and the task name are different at properties file. I am
wondering how should I debug on it?
More details are list below.
All your help is highly appreciated.
Sincerely,
Selina
Currently HttpDemoParserStreamTask run well.
However HttpDemoStatsStreamTask can generate the log correctly withouot
Exception at
deploy/yarn/logs/userlogs/application_1438043584310_0001/container_1438043584310_0001_01_000002/
samza-container-0.log
The last record as below is right, however there is no topic "
demo-stats-temp" was created.
--------------------------------------
2015-07-27 17:34:48 HttpDemoParserStreamTask [INFO]
key=CAESEAbQ1pC2TBvb-4SLDjMqsZ8: message={"timestamp":"2015-07-27
14:30:02:987","date":"06-21-2015","id":"CAESEAbQ1pC2TBvb-4SLDjMqsZ8","ip":"22.231.113.69","browser":"Chrome","postalCode":"95131","url":"http://somthing.sample2.com/whatever?someinfo\u003dwekio2icicicnenneniadidi","language":"ENG","mobileBrand":"Samsung","carrierName":"Tmobile","deviceName":"Samsung
Galaxy S6","operationSystem":"Android
5.0.2","screenSize":"5.1-inch","resolution":"1440p","campaignId":"65681290456292569","count":"5607"}
-------------------The demo-stats.properties files-----------------------------
# Job
job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
job.name=demo-stats-tmp
task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
task.checkpoint.system=kafka
# Normally, this would be 3, but we have only one broker.
task.checkpoint.replication.factor=1
# YARN
yarn.package.path=file:///Users/selina/IdeaProjects/samza-Demo/target/hello-samza-0.9.1-dist.tar.gz
# Task
task.class=samza.http.demo.task.HttpDemoParserStreamTask
task.inputs=kafka.http-demo
# Serializers
serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
# Kafka System
systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
systems.kafka.samza.msg.serde=string
systems.kafka.samza.key.serde=string
systems.kafka.consumer.zookeeper.connect=localhost:2181/
systems.kafka.producer.bootstrap.servers=localhost:9092
#stream from begining
#systems.kafka.consumer.auto.offset.reset=smallest
#http-demo from the oldest
systems.kafka.http-demo.samza.offset.default=oldest
# all stream from the oldest
systems.kafka.streams.http-demo.samza.offset.default=oldest
systems.kafka.streams.http-demo.samza.reset.offset=true
--------------------HttpDemoStatsStreamTask class----------------------------
public class HttpDemoStatsStreamTask implements StreamTask {
//output topic
private static final SystemStream OUTPUT_STREAM = new
SystemStream("kafka", "demo-stats-temp");
Logger logger = LoggerFactory.getLogger(HttpDemoStatsStreamTask.class);
@SuppressWarnings("unchecked")
@Override
public void process(IncomingMessageEnvelope envelope,
MessageCollector collector, TaskCoordinator coordinator) throws
Exception {
String key = (String) envelope.getKey();
String message = envelope.getMessage().toString();
logger.info("key=" + key + ": message=" + message);
collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, message));
}
}
-----Tail of __samza_checkpoint_ver_1_for_demo-stats-tmp_1 topic--------------
{"Partition 0":0}
{"SystemStreamPartition [kafka, http-demo,
0]":{"system":"kafka","partition":"0","offset":"0","stream":"http-demo"}}
{"SystemStreamPartition [kafka, http-demo,
0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
{"SystemStreamPartition [kafka, http-demo,
0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
{"SystemStreamPartition [kafka, http-demo,
0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
{"SystemStreamPartition [kafka, http-demo,
0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
{"SystemStreamPartition [kafka, http-demo,
0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
{"SystemStreamPartition [kafka, http-demo,
0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
{"SystemStreamPartition [kafka, http-demo,
0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
{"SystemStreamPartition [kafka, http-demo,
0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
{"SystemStreamPartition [kafka, http-demo,
0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
{"SystemStreamPartition [kafka, http-demo,
0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
{"SystemStreamPartition [kafka, http-demo,
0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
{"SystemStreamPartition [kafka, http-demo,
0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
{"SystemStreamPartition [kafka, http-demo,
0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}