Another pointer is to run the job with DEBUG level turned on and share the
logs with us.

On Fri, Feb 5, 2016 at 5:05 PM, Jagadish Venkatraman <jagadish1...@gmail.com
> wrote:

> Hey Jason,
>
> Can you share the entire container log? It will be useful to find out what
> went wrong.
> If this is a non-yarn, it will be also useful if you share the JobRunner
> logs.
>
> Thanks,
> Jagadish
>
> On Fri, Feb 5, 2016 at 4:33 PM, Jason Erickson <ja...@stormpath.com>
> wrote:
>
>> Is the Kafka producer configuration different than the Samza configuration
>> of the Samza task that references the store? If not, here is are those
>> configuration values.  The changelog in question is
>> resourceStore-changelog.
>>
>> # Job
>> job.factory.class=org.apache.samza.job.local.ThreadJobFactory
>> job.name=resource_normalizer
>>
>> job.coordinator.system=kafka
>>
>> # Task
>>
>> task.class=com.foo.blazer.resource.normalizer.samza.ResourceNormalizerSamzaTask
>> task.inputs=kafka.com.foo.iam.indexing.resource.mutation
>> task.window.ms=10000
>>
>> task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
>> task.checkpoint.system=kafka
>> # Normally, this would be 3, but we have only one broker.
>> task.checkpoint.replication.factor=3
>> task.checkpoint.skip-migration=true
>>
>> # Serializers
>>
>> serializers.registry.byte.class=org.apache.samza.serializers.ByteSerdeFactory
>>
>> serializers.registry.entity.class=com.foo.blazer.resource.normalizer.serde.ResourceEventEntitySerdeFactory
>>
>> serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
>>
>> serializers.registry.int.class=org.apache.samza.serializers.IntegerSerdeFactory
>>
>> serializers.registry.metrics.class=org.apache.samza.serializers.MetricsSnapshotSerdeFactory
>>
>> # Systems
>>
>> systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
>> systems.kafka.samza.msg.serde=byte
>> systems.kafka.samza.offset.default=oldest
>> systems.kafka.consumer.zookeeper.connect=${ZK_NODES}/${ZK_ROOT}
>> systems.kafka.consumer.auto.offset.reset=smallest
>> systems.kafka.producer.bootstrap.servers=${KAFKA_NODES}
>> systems.kafka.producer.max.request.size=52428800
>> systems.kafka.streams.metrics.samza.msg.serde=metrics
>>
>> # Metrics
>>
>> metrics.reporter.snapshot.class=org.apache.samza.metrics.reporter.MetricsSnapshotReporterFactor
>> metrics.reporter.snapshot.stream=kafka.metrics
>>
>> metrics.reporter.jmx.class=org.apache.samza.metrics.reporter.JmxReporterFactory
>> metrics.reporters=snapshot,jmx
>>
>> # Stores
>>
>> stores.resourceStore.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory
>> stores.resourceStore.changelog=kafka.resourceStore-changelog
>> stores.resourceStore.changelog.replication.factor=3
>>
>> stores.resourceStore.key.serde=string
>> stores.resourceStore.msg.serde=entity
>>
>>
>> stores.deletedFlagStore.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory
>> stores.deletedFlagStore.changelog=kafka.deletedFlagStore-changelog
>> stores.deletedFlagStore.changelog.replication.factor=3
>>
>> stores.deletedFlagStore.key.serde=string
>> stores.deletedFlagStore.msg.serde=int
>>
>>
>>
>> We do not get a stack trace from the Samza task itself, it just never
>> seems
>> to fully start.  However if we use kafka-console-consumer to try to
>> examine
>> the changelog we get this:
>>
>> 2016-02-02 22:10:11,252] ERROR Error processing message, terminating
>> consumer process:  (kafka.tools.ConsoleConsumer$)
>> kafka.common.MessageSizeTooLargeException: Found a message larger than the
>> maximum fetch size of this consumer on topic resourceStore-changelog
>> partition 7 at fetch offset 0. Increase the fetch size, or decrease the
>> maximum message size the broker will allow.
>> at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:90)
>> at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33)
>> at
>> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
>> at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
>> at kafka.consumer.OldConsumer.receive(BaseConsumer.scala:79)
>> at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:110)
>> at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:69)
>> at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:47)
>> at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
>>
>>
>> On Fri, Feb 5, 2016 at 3:42 PM, Ramesh Bhojan <rbhojan.soc...@gmail.com>
>> wrote:
>>
>> > Jason,
>> > Can we please share more information about the exact stack trace and the
>> > job configuration, especially the Kafka producer configuration for the
>> > changelog system, as requested by Yi Pan?
>> >
>> > Regards,
>> > Ramesh
>> >
>> > On Thu, Feb 4, 2016 at 11:56 AM, Ramesh Bhojan <
>> rbhojan.soc...@gmail.com>
>> > wrote:
>> >
>> >> Dear team @ Samza,
>> >> I would really appreciate some help with the following question posted
>> in
>> >> Stack Overflow :
>> >>
>> >>
>> >>
>> http://stackoverflow.com/questions/35168641/is-there-a-configuration-setting-to-allow-large-values-in-my-samza-store
>> >>
>> >> Thanks,
>> >> Ramesh
>> >>
>> >
>> >
>>
>>
>> --
>> Thanks,
>>
>> Jason Erickson
>>
>
>
>
> --
> Jagadish V,
> Graduate Student,
> Department of Computer Science,
> Stanford University
>



-- 
Jagadish V,
Graduate Student,
Department of Computer Science,
Stanford University

Reply via email to