Hi Navina,

Thanks for your clear description, I understand more now.

ShuQi

————————
舒琦
地址:长沙市岳麓区文轩路27号麓谷企业广场A4栋1单元6F
网址:http://www.eefung.com
微博:http://weibo.com/eefung
邮编:410013
电话:400-677-0986
传真:0731-88519609

> 在 2016年9月9日,04:19,Navina Ramesh <nram...@linkedin.com.INVALID> 写道:
> 
> Hi ShuQi,
> 
> Auto-creation of streams depends on your Kafka server configuration. In
> case of coordinator stream and checkpoint stream, samza explicitly creates
> a stream with 1 partition before publishing to it. This doesn't apply for
> metrics. So, if auto-creation is turned off in kafka server, then you have
> to manually create it, as you did.
> 
> Glad you figured it out.
> 
> Cheers!
> Navina
> 
> On Wed, Sep 7, 2016 at 6:37 PM, 舒琦 <sh...@eefung.com> wrote:
> 
>> Hi,
>> 
>> Thanks for your help.
>> 
>> I think checkpoint stream and coordinate stream will be auto-create per
>> job if using kaka, but not metrics.
>> 
>> After I manually created metrics stream in Kafka, the metrics is written
>> into the stream.
>> 
>> 
>> ShuQi
>> 
>>> 在 2016年9月7日,23:15,Jagadish Venkatraman <jagadish1...@gmail.com> 写道:
>>> 
>>> Can you run your program in DEBUG log-level? Does sending the metric to
>> the
>>> producer fail? Is the metric reporter thread showing an exception? (check
>>> the stderr file too)
>>> 
>>> Producing to a kafka topic should usually auto-create it.
>>> 
>>> On Wed, Sep 7, 2016 at 2:09 AM, 舒琦 <sh...@eefung.com> wrote:
>>> 
>>>> Hi,
>>>> 
>>>> My samza job has following metrics configuration:
>>>> 
>>>> serializers.registry.metrics.class=org.apache.samza.serializers.
>>>> MetricsSnapshotSerdeFactory
>>>> 
>>>> systems.kafka.samza.factory=org.apache.samza.system.kafka.
>>>> KafkaSystemFactory
>>>> systems.kafka.consumer.zookeeper.connect=zk11:3181,zk12:3181,zk13:3181
>>>> systems.kafka.producer.bootstrap.servers=buka1:9096,
>> buka2:9096,buka3:9096
>>>> 
>>>> systems.kafka.streams.samza-metrics.samza.msg.serde=metrics
>>>> 
>>>> metrics.reporter.snapshot.class=org.apache.samza.metrics.reporter.
>>>> MetricsSnapshotReporterFactory
>>>> metrics.reporter.snapshot.stream=kafka.samza-metrics
>>>> metrics.reporters=snapshot
>>>> 
>>>> And the job is deployed on yarn, after job started, everything is fine,
>> I
>>>> also can find coordinator stream and checkpoint stream in the same kafka
>>>> cluster, but there is no samza-metrics stream.
>>>> 
>>>> One of the container log :
>>>> 
>>>> 2016-09-07 16:32:31.947 [main] MetricsSnapshotReporterFactory [WARN]
>>>> Unable to find implementation version in jar's meta info. Defaulting to
>>>> 0.0.1.
>>>> 2016-09-07 16:32:31.948 [main] MetricsSnapshotReporterFactory [INFO] Got
>>>> system stream SystemStream [system=kafka, stream=samza-metrics].
>>>> 2016-09-07 16:32:31.949 [main] MetricsSnapshotReporterFactory [INFO] Got
>>>> system factory org.apache.samza.system.kafka.
>> KafkaSystemFactory@1eed1f10.
>>>> 2016-09-07 16:32:31.950 [main] MetricsSnapshotReporterFactory [INFO] Got
>>>> producer org.apache.samza.system.kafka.KafkaSystemProducer@16d96b45.
>>>> 2016-09-07 16:32:31.951 [main] MetricsSnapshotReporterFactory [INFO] Got
>>>> serde org.apache.samza.serializers.MetricsSnapshotSerde@569f129d.
>>>> 2016-09-07 16:32:31.952 [main] MetricsSnapshotReporterFactory [INFO]
>>>> Setting polling interval to 60
>>>> 2016-09-07 16:32:31.954 [main] MetricsSnapshotReporter [INFO] got
>> metrics
>>>> snapshot reporter properties [job name: data-status-persistent-hstore,
>> job
>>>> id: 1, containerName: samza-container-1, version: 0.0.1, samzaVersion:
>>>> 0.10.1, host: store116, pollingInterval 60]
>>>> 2016-09-07 16:32:31.955 [main] MetricsSnapshotReporter [INFO]
>> Registering
>>>> MetricsSnapshotReporterFactory with producer.
>>>> 2016-09-07 16:32:31.955 [main] SamzaContainer$ [INFO] Got metrics
>>>> reporters: Set(snapshot)
>>>> 
>>>> 2016-09-07 16:32:32.016 [main] MetricsSnapshotReporter [INFO]
>> Registering
>>>> TaskName-Partition 7 with producer.
>>>> 2016-09-07 16:32:32.016 [main] MetricsSnapshotReporter [INFO]
>> Registering
>>>> TaskName-Partition 1 with producer.
>>>> 2016-09-07 16:32:32.016 [main] MetricsSnapshotReporter [INFO]
>> Registering
>>>> TaskName-Partition 5 with producer.
>>>> 2016-09-07 16:32:32.016 [main] MetricsSnapshotReporter [INFO]
>> Registering
>>>> TaskName-Partition 3 with producer.
>>>> 2016-09-07 16:32:32.017 [main] SamzaContainer [INFO] Starting JVM
>> metrics.
>>>> 2016-09-07 16:32:32.017 [main] SamzaContainer [INFO] Starting metrics
>>>> reporters.
>>>> 2016-09-07 16:32:32.018 [main] MetricsSnapshotReporter [INFO]
>> Registering
>>>> samza-container-1 with producer.
>>>> 2016-09-07 16:32:32.018 [main] MetricsSnapshotReporter [INFO] Starting
>>>> producer.
>>>> 2016-09-07 16:32:32.018 [main] MetricsSnapshotReporter [INFO] Starting
>>>> reporter timer.
>>>> 2016-09-07 16:32:32.019 [main] SamzaContainer [INFO] Registering task
>>>> instances with offsets.
>>>> 2016-09-07 16:32:32.022 [main] SamzaContainer [INFO] Starting offset
>>>> manager.
>>>> 
>>>> 2016-09-07 16:32:32.212 [SAMZA-METRIC-SNAPSHOT-REPORTER]
>>>> KafkaSystemProducer [INFO] Creating a new producer for system kafka.
>>>> 2016-09-07 16:32:32.221 [SAMZA-METRIC-SNAPSHOT-REPORTER] ProducerConfig
>>>> [INFO] ProducerConfig values:
>>>>       value.serializer = class org.apache.kafka.common.serialization.
>>>> ByteArraySerializer
>>>>       key.serializer = class org.apache.kafka.common.serialization.
>>>> ByteArraySerializer
>>>>       block.on.buffer.full = true
>>>>       retry.backoff.ms = 100
>>>>       buffer.memory = 33554432
>>>>       batch.size = 16384
>>>>       metrics.sample.window.ms = 30000
>>>>       metadata.max.age.ms = 300000
>>>>       receive.buffer.bytes = 32768
>>>>       timeout.ms = 30000
>>>>       max.in.flight.requests.per.connection = 1
>>>>       bootstrap.servers = [buka1:9096, buka2:9096, buka3:9096]
>>>>       metric.reporters = []
>>>>       client.id = samza_producer-data_status_persistent_hstore-1-
>>>> 1473237151949-4
>>>>       compression.type = none
>>>>       retries = 2147483647
>>>>       max.request.size = 1048576
>>>>       send.buffer.bytes = 131072
>>>>       acks = 1
>>>>       reconnect.backoff.ms = 10
>>>>       linger.ms = 0
>>>>       metrics.num.samples = 2
>>>>       metadata.fetch.timeout.ms = 60000
>>>> 
>>>> Thanks.
>>>> 
>>>> ————————
>>>> 舒琦
>>>> 地址:长沙市岳麓区文轩路27号麓谷企业广场A4栋1单元6F
>>>> 网址:http://www.eefung.com
>>>> 微博:http://weibo.com/eefung
>>>> 邮编:410013
>>>> 电话:400-677-0986
>>>> 传真:0731-88519609
>>>> 
>>>> 
>>> 
>>> 
>>> --
>>> Jagadish V,
>>> Graduate Student,
>>> Department of Computer Science,
>>> Stanford University
>> 
>> 
> 
> 
> -- 
> Navina R.

Reply via email to