[ https://issues.apache.org/jira/browse/STORM-1199?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15089537#comment-15089537 ]
ASF GitHub Bot commented on STORM-1199: --------------------------------------- Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/936#discussion_r49211864 --- Diff: external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java --- @@ -0,0 +1,739 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.hdfs.spout; + +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.net.URI; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; + +import backtype.storm.Config; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.storm.hdfs.common.HdfsUtils; +import org.apache.storm.hdfs.common.security.HdfsSecurityUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import backtype.storm.spout.SpoutOutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.topology.base.BaseRichSpout; +import backtype.storm.tuple.Fields; + +public class HdfsSpout extends BaseRichSpout { + + // user configurable + private String hdfsUri; // required + private String readerType; // required + private Fields outputFields; // required + private Path sourceDirPath; // required + private Path archiveDirPath; // required + private Path badFilesDirPath; // required + private Path lockDirPath; + + private int commitFrequencyCount = Configs.DEFAULT_COMMIT_FREQ_COUNT; + private int commitFrequencySec = Configs.DEFAULT_COMMIT_FREQ_SEC; + private int maxOutstanding = Configs.DEFAULT_MAX_OUTSTANDING; + private int lockTimeoutSec = Configs.DEFAULT_LOCK_TIMEOUT; + private boolean clocksInSync = true; + + private String inprogress_suffix = ".inprogress"; + private String ignoreSuffix = ".ignore"; + + // other members + private static final Logger LOG = LoggerFactory.getLogger(HdfsSpout.class); + + private ProgressTracker tracker = null; + + private FileSystem hdfs; + private FileReader reader; + + private SpoutOutputCollector collector; + HashMap<MessageId, List<Object> > inflight = new HashMap<>(); + LinkedBlockingQueue<HdfsUtils.Pair<MessageId, List<Object>>> retryList = new LinkedBlockingQueue<>(); + + private Configuration hdfsConfig; + + private Map conf = null; + private FileLock lock; + private String spoutId = null; + + HdfsUtils.Pair<Path,FileLock.LogEntry> lastExpiredLock = null; + private long lastExpiredLockTime = 0; + + private long tupleCounter = 0; + private boolean ackEnabled = false; + private int acksSinceLastCommit = 0 ; + private final AtomicBoolean commitTimeElapsed = new AtomicBoolean(false); + private Timer commitTimer; + private boolean fileReadCompletely = true; + + private String configKey = Configs.DEFAULT_HDFS_CONFIG_KEY; // key for hdfs Kerberos configs + + public HdfsSpout() { + } + /** Name of the output field names. Number of fields depends upon the reader type */ + public HdfsSpout withOutputFields(String... fields) { + outputFields = new Fields(fields); + return this; + } + + /** set key name under which HDFS options are placed. (similar to HDFS bolt). + * default key name is 'hdfs.config' */ + public HdfsSpout withConfigKey(String configKey) { + this.configKey = configKey; + return this; + } + + public Path getLockDirPath() { + return lockDirPath; + } + + public SpoutOutputCollector getCollector() { + return collector; + } + + public void nextTuple() { + LOG.trace("Next Tuple {}", spoutId); + // 1) First re-emit any previously failed tuples (from retryList) + if (!retryList.isEmpty()) { + LOG.debug("Sending tuple from retry list"); + HdfsUtils.Pair<MessageId, List<Object>> pair = retryList.remove(); + emitData(pair.getValue(), pair.getKey()); + return; + } + + if( ackEnabled && tracker.size()>= maxOutstanding) { + LOG.warn("Waiting for more ACKs before generating new tuples. " + + "Progress tracker size has reached limit {}, SpoutID {}" + , maxOutstanding, spoutId); + // Don't emit anything .. allow configured spout wait strategy to kick in + return; + } + + // 2) If no failed tuples to be retried, then send tuples from hdfs + while (true) { + try { + // 3) Select a new file if one is not open already + if (reader == null) { + reader = pickNextFile(); + if (reader == null) { + LOG.debug("Currently no new files to process under : " + sourceDirPath); + return; + } else { + fileReadCompletely=false; + } + } + if( fileReadCompletely ) { // wait for more ACKs before proceeding + return; + } + // 4) Read record from file, emit to collector and record progress + List<Object> tuple = reader.next(); + if (tuple != null) { + fileReadCompletely= false; + ++tupleCounter; + MessageId msgId = new MessageId(tupleCounter, reader.getFilePath(), reader.getFileOffset()); + emitData(tuple, msgId); + + if(!ackEnabled) { --- End diff -- If acks are disabled by storm ack is called as part of the call to emit. I think you may ahve extra logic in here that you do not need. > Create HDFS Spout > ----------------- > > Key: STORM-1199 > URL: https://issues.apache.org/jira/browse/STORM-1199 > Project: Apache Storm > Issue Type: New Feature > Reporter: Roshan Naik > Assignee: Roshan Naik > Attachments: HDFSSpoutforStorm v2.pdf, HDFSSpoutforStorm.pdf, > hdfs-spout.1.patch > > > Create an HDFS spout so that Storm can suck in data from files in a HDFS > directory -- This message was sent by Atlassian JIRA (v6.3.4#6332)