Dear all,

My team just finished migrating all our topologies from 0.9.4 to Storm 1.1.0, 
and we are experiencing a "strange" issue with storm-hdfs when deploying them 
to our test cluster : after receiving 100 tuples, the SequenceFileBolt present 
in all our topologies doesn't process anything until the tick tuple forcing a 
file sync comes in, and allow a hundred mores tuples to get written to hdfs, 
and so on… it gives us a latency of x secs (x being the interval between the 
sync tick tuples) which is not really good when the rest of the topology 
handles 20k events per second without any troubles.

The topologies roughly look like this :

Spout -> Bolt A -> SequenceFileBolt
                          |-> Bolt B -> Bolt C -> Bolt D


The SequenceFileBolt is always a leaf of short branch of my topologies, with a 
CountSyncPolicy set to 1000 for our test cluster and pretty much everything 
else set to the default values.

    public static IRichBolt build(String configPrefix, String topoName) throws 
Exception {
        String url = 
ConfigurationFactory.getConfiguration().getString(HdfsConstants.HDFS_URL);
        int count = ConfigurationFactory.getConfiguration().getInt("hdfs." + 
COUNT_SYNC);
        // Bolt building
        SequenceFileBolt bolt = new SequenceFileBolt().
                withFsUrl(url).
                withSyncPolicy(new CountSyncPolicy(count)).
                withSequenceFormat(new TrackingEventSequenceFormat()).
                withRotationPolicy(buildRotationPolicy(configPrefix)).
                withFileNameFormat(builFileNameFormat(topoName)).
                withCompressionType(CompressionType.NONE);
        return bolt;
    }


The topology.max.spout.pending is set to 2000 (the whole config for one of the 
topologies is available as an attachment).

After reading the code of the AbstractHDFSBolt, I understand that the tuples 
are now acked only after they are synced (it wasn't the case in storm-hdfs 
0.9.4 where the tuples were acked right after the write call), so it's 
important to not set the count policy too high compared to the 
max.spout.pending value. But I can't figure why it stops doing anything after 
100 tuples which causes the syncpolicy to never reach its limit, and the bolt 
to wait for the sync tuple. I can get a "relatively" "correct" throughput with 
a CountSyncPolicy set to 100, but if I go 101, I'm done for :/ I have been 
tinkering with the code and the configuration for two days now, and I can't 
figure it out.

If any of you has any idea about what we are doing wrong, your help would be 
greaaaaaatly appreciated!

Best regards,

--
Guillaume CONTE
CTO

57 rue de Villiers 92576 Neuilly-sur-Seine
Tel: +33 (0)1 85 63 51 30
[email protected] | www.keyade.com




backpressure.disruptor.high.watermark: 0.9
backpressure.disruptor.low.watermark: 0.4
client.blobstore.class: "org.apache.storm.blobstore.NimbusBlobStore"
dev.zookeeper.path: "/tmp/dev-storm-zookeeper"
drpc.authorizer.acl.filename: "drpc-auth-acl.yaml"
drpc.authorizer.acl.strict: false
drpc.childopts: "-Xmx768m"
drpc.http.creds.plugin: 
"org.apache.storm.security.auth.DefaultHttpCredentialsPlugin"
drpc.http.port: 3774
drpc.https.keystore.password: ""
drpc.https.keystore.type: "JKS"
drpc.https.port: -1
drpc.invocations.port: 3773
drpc.invocations.threads: 64
drpc.max_buffer_size: 1048576
drpc.port: 3772
drpc.queue.size: 128
drpc.request.timeout.secs: 600
drpc.servers: [
  "nimbus.keyade.pro",
  "storm.keyade.pro"
]
drpc.worker.threads: 64
java.library.path: "/usr/local/lib:/opt/local/lib:/usr/lib"
logs.users: null
logviewer.appender.name: "A1"
logviewer.childopts: "-Xmx128m"
logviewer.cleanup.age.mins: 10080
logviewer.max.per.worker.logs.size.mb: 2048
logviewer.max.sum.worker.logs.size.mb: 4096
logviewer.port: 8000
nimbus.blobstore.class: "org.apache.storm.blobstore.LocalFsBlobStore"
nimbus.blobstore.expiration.secs: 600
nimbus.childopts: "-Xmx1024m -Djava.net.preferIPv4Stack=true"
nimbus.cleanup.inbox.freq.secs: 600
nimbus.code.sync.freq.secs: 120
nimbus.credential.renewers.freq.secs: 600
nimbus.file.copy.expiration.secs: 600
nimbus.inbox.jar.expiration.secs: 3600
nimbus.monitor.freq.secs: 10
nimbus.queue.size: 100000
nimbus.seeds: [
  "nimbus.keyade.pro"
]
nimbus.supervisor.timeout.secs: 60
nimbus.task.launch.secs: 360
nimbus.task.timeout.secs: 60
nimbus.thrift.max_buffer_size: 1048576
nimbus.thrift.port: 6627
nimbus.thrift.threads: 256
nimbus.topology.validator: "org.apache.storm.nimbus.DefaultTopologyValidator"
pacemaker.auth.method: "NONE"
pacemaker.base.threads: 10
pacemaker.childopts: "-Xmx1024m"
pacemaker.host: "localhost"
pacemaker.kerberos.users: [ ]
pacemaker.max.threads: 50
pacemaker.port: 6699
pacemaker.thread.timeout: 10
resource.aware.scheduler.eviction.strategy: 
"org.apache.storm.scheduler.resource.strategies.eviction.DefaultEvictionStrategy"
resource.aware.scheduler.priority.strategy: 
"org.apache.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy"
scheduler.display.resource: false
storm.auth.simple-acl.admins: [ ]
storm.auth.simple-acl.users: [ ]
storm.auth.simple-acl.users.commands: [ ]
storm.auth.simple-white-list.users: [ ]
storm.blobstore.acl.validation.enabled: false
storm.blobstore.inputstream.buffer.size.bytes: 65536
storm.blobstore.replication.factor: 3
storm.cluster.metrics.consumer.publish.interval.secs: 60
storm.cluster.mode: "distributed"
storm.cluster.state.store: 
"org.apache.storm.cluster_state.zookeeper_state_factory"
storm.codedistributor.class: 
"org.apache.storm.codedistributor.LocalFileSystemCodeDistributor"
storm.daemon.metrics.reporter.plugins: [
  "org.apache.storm.daemon.metrics.reporters.JmxPreparableReporter"
]
storm.disable.symlinks: false
storm.exhibitor.poll.uripath: "/exhibitor/v1/cluster/list"
storm.exhibitor.port: 8080
storm.group.mapping.service: 
"org.apache.storm.security.auth.ShellBasedGroupsMapping"
storm.group.mapping.service.cache.duration.secs: 120
storm.group.mapping.service.params: null
storm.health.check.dir: "healthchecks"
storm.health.check.timeout.ms: 5000
storm.id: "clickTopology-95-1496850052"
storm.local.dir: "/var/lib/storm"
storm.local.mode.zmq: false
storm.log4j2.conf.dir: "log4j2"
storm.messaging.netty.authentication: false
storm.messaging.netty.buffer_size: 5242880
storm.messaging.netty.client_worker_threads: 1
storm.messaging.netty.max_retries: 300
storm.messaging.netty.max_wait_ms: 1000
storm.messaging.netty.min_wait_ms: 100
storm.messaging.netty.server_worker_threads: 1
storm.messaging.netty.socket.backlog: 500
storm.messaging.netty.transfer.batch.size: 262144
storm.messaging.transport: "org.apache.storm.messaging.netty.Context"
storm.meta.serialization.delegate: 
"org.apache.storm.serialization.GzipThriftSerializationDelegate"
storm.network.topography.plugin: 
"org.apache.storm.networktopography.DefaultRackDNSToSwitchMapping"
storm.nimbus.retry.interval.millis: 2000
storm.nimbus.retry.intervalceiling.millis: 60000
storm.nimbus.retry.times: 5
storm.principal.tolocal: 
"org.apache.storm.security.auth.DefaultPrincipalToLocal"
storm.thrift.socket.timeout.ms: 600000
storm.thrift.transport: "org.apache.storm.security.auth.SimpleTransportPlugin"
storm.workers.artifacts.dir: "workers-artifacts"
storm.zookeeper.auth.password: null
storm.zookeeper.auth.user: null
storm.zookeeper.connection.timeout: 15000
storm.zookeeper.port: 2181
storm.zookeeper.retry.interval: 1000
storm.zookeeper.retry.intervalceiling.millis: 30000
storm.zookeeper.retry.times: 5
storm.zookeeper.root: "/storm"
storm.zookeeper.servers: [
  "zookeeper.keyade.pro",
  "zookeeper2.keyade.pro",
  "namenode2.keyade.pro"
]
storm.zookeeper.session.timeout: 20000
storm.zookeeper.superACL: null
supervisor.blobstore.class: "org.apache.storm.blobstore.NimbusBlobStore"
supervisor.blobstore.download.max_retries: 3
supervisor.blobstore.download.thread.count: 5
supervisor.childopts: "-Xmx256m -Djava.net.preferIPv4Stack=true"
supervisor.cpu.capacity: 400
supervisor.enable: true
supervisor.heartbeat.frequency.secs: 5
supervisor.localizer.cache.target.size.mb: 10240
supervisor.localizer.cleanup.interval.ms: 600000
supervisor.memory.capacity.mb: 3072
supervisor.monitor.frequency.secs: 3
supervisor.run.worker.as.user: false
supervisor.slots.ports: [
  6700,
  6701,
  6702,
  6703
]
supervisor.supervisors: [ ]
supervisor.supervisors.commands: [ ]
supervisor.worker.shutdown.sleep.secs: 3
supervisor.worker.start.timeout.secs: 120
supervisor.worker.timeout.secs: 30
task.backpressure.poll.secs: 30
task.credentials.poll.secs: 30
task.heartbeat.frequency.secs: 3
task.refresh.poll.secs: 10
topology.acker.executors: 4
topology.backpressure.enable: false
topology.bolts.outgoing.overflow.buffer.enable: false
topology.builtin.metrics.bucket.size.secs: 60
topology.classpath: null
topology.component.cpu.pcore.percent: 10
topology.component.resources.offheap.memory.mb: 0
topology.component.resources.onheap.memory.mb: 128
topology.debug: false
topology.disable.loadaware.messaging: false
topology.disruptor.batch.size: 100
topology.disruptor.batch.timeout.millis: 1
topology.disruptor.wait.timeout.millis: 1000
topology.enable.message.timeouts: true
topology.environment: null
topology.error.throttle.interval.secs: 10
topology.eventlogger.executors: 0
topology.executor.receive.buffer.size: 1024
topology.executor.send.buffer.size: 1024
topology.fall.back.on.java.serialization: true
topology.kryo.decorators: [ ]
topology.kryo.factory: "org.apache.storm.serialization.DefaultKryoFactory"
topology.kryo.register: null
topology.max.error.report.per.interval: 5
topology.max.replication.wait.time.sec: 60
topology.max.spout.pending: 2000
topology.max.task.parallelism: 200
topology.message.timeout.secs: 30
topology.min.replication.count: 1
topology.multilang.serializer: "org.apache.storm.multilang.JsonSerializer"
topology.name: "clickTopology"
topology.priority: 29
topology.scheduler.strategy: 
"org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy"
topology.shellbolt.max.pending: 100
topology.skip.missing.kryo.registrations: false
topology.sleep.spout.wait.strategy.time.ms: 1
topology.spout.wait.strategy: "org.apache.storm.spout.SleepSpoutWaitStrategy"
topology.state.checkpoint.interval.ms: 1000
topology.state.synchronization.timeout.secs: 60
topology.stats.sample.rate: 0.05
topology.submitter.principal: ""
topology.submitter.user: "storm"
topology.tasks: null
topology.testing.always.try.serialize: false
topology.tick.tuple.freq.secs: null
topology.transfer.buffer.size: 1024
topology.trident.batch.emit.interval.millis: 500
topology.tuple.serializer: 
"org.apache.storm.serialization.types.ListDelegateSerializer"
topology.users: [ ]
topology.worker.childopts: null
topology.worker.logwriter.childopts: "-Xmx64m"
topology.worker.max.heap.size.mb: 768
topology.worker.receiver.thread.count: 1
topology.worker.shared.thread.pool.size: 4
topology.workers: 2
transactional.zookeeper.port: null
transactional.zookeeper.root: "/transactional"
transactional.zookeeper.servers: null
ui.actions.enabled: true
ui.childopts: "-Xmx768m"
ui.filter: null
ui.filter.params: null
ui.header.buffer.bytes: 4096
ui.host: "0.0.0.0"
ui.http.creds.plugin: 
"org.apache.storm.security.auth.DefaultHttpCredentialsPlugin"
ui.http.x-frame-options: "DENY"
ui.port: 6999
ui.users: null
worker.childopts: " -Xmx1024m -Xms1024m -Xss256k -XX:MaxPermSize=128m 
-XX:PermSize=96m -XX:NewSize=468m -XX:MaxNewSize=468m 
-XX:MaxTenuringThreshold=2 -XX:SurvivorRatio=6 -XX:+UseParNewGC 
-XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled 
-XX:CMSInitiatingOccupancyFraction=75 -XX:+UseCMSInitiatingOccupancyOnly 
-server -XX:+AggressiveOpts -XX:+UseCompressedOops 
-Xloggc:%STORM_HOME%/logs/gc-worker-%ID%.log -verbose:gc -XX:GCLogFileSize=1m 
-XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:+PrintGCDetails 
-XX:+PrintHeapAtGC -XX:+PrintGCTimeStamps -XX:+PrintClassHistogram 
-XX:+PrintTenuringDistribution -XX:-PrintGCApplicationStoppedTime 
-Djava.net.preferIPv4Stack=true"
worker.gc.childopts: ""
worker.heap.memory.mb: 768
worker.heartbeat.frequency.secs: 5
worker.log.level.reset.poll.secs: 30
worker.profiler.childopts: "-XX:+UnlockCommercialFeatures -XX:+FlightRecorder"
worker.profiler.command: "flight.bash"
worker.profiler.enabled: false
zmq.hwm: 0
zmq.linger.millis: 5000
zmq.threads: 1

Reply via email to