[ https://issues.apache.org/jira/browse/STORM-1199?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15089542#comment-15089542 ]
ASF GitHub Bot commented on STORM-1199: --------------------------------------- Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/936#discussion_r49212163 --- Diff: external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java --- @@ -0,0 +1,739 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.hdfs.spout; + +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.net.URI; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; + +import backtype.storm.Config; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.storm.hdfs.common.HdfsUtils; +import org.apache.storm.hdfs.common.security.HdfsSecurityUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import backtype.storm.spout.SpoutOutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.topology.base.BaseRichSpout; +import backtype.storm.tuple.Fields; + +public class HdfsSpout extends BaseRichSpout { + + // user configurable + private String hdfsUri; // required + private String readerType; // required + private Fields outputFields; // required + private Path sourceDirPath; // required + private Path archiveDirPath; // required + private Path badFilesDirPath; // required + private Path lockDirPath; + + private int commitFrequencyCount = Configs.DEFAULT_COMMIT_FREQ_COUNT; + private int commitFrequencySec = Configs.DEFAULT_COMMIT_FREQ_SEC; + private int maxOutstanding = Configs.DEFAULT_MAX_OUTSTANDING; + private int lockTimeoutSec = Configs.DEFAULT_LOCK_TIMEOUT; + private boolean clocksInSync = true; + + private String inprogress_suffix = ".inprogress"; + private String ignoreSuffix = ".ignore"; + + // other members + private static final Logger LOG = LoggerFactory.getLogger(HdfsSpout.class); + + private ProgressTracker tracker = null; + + private FileSystem hdfs; + private FileReader reader; + + private SpoutOutputCollector collector; + HashMap<MessageId, List<Object> > inflight = new HashMap<>(); + LinkedBlockingQueue<HdfsUtils.Pair<MessageId, List<Object>>> retryList = new LinkedBlockingQueue<>(); + + private Configuration hdfsConfig; + + private Map conf = null; + private FileLock lock; + private String spoutId = null; + + HdfsUtils.Pair<Path,FileLock.LogEntry> lastExpiredLock = null; + private long lastExpiredLockTime = 0; + + private long tupleCounter = 0; + private boolean ackEnabled = false; + private int acksSinceLastCommit = 0 ; + private final AtomicBoolean commitTimeElapsed = new AtomicBoolean(false); + private Timer commitTimer; + private boolean fileReadCompletely = true; + + private String configKey = Configs.DEFAULT_HDFS_CONFIG_KEY; // key for hdfs Kerberos configs + + public HdfsSpout() { + } + /** Name of the output field names. Number of fields depends upon the reader type */ + public HdfsSpout withOutputFields(String... fields) { + outputFields = new Fields(fields); + return this; + } + + /** set key name under which HDFS options are placed. (similar to HDFS bolt). + * default key name is 'hdfs.config' */ + public HdfsSpout withConfigKey(String configKey) { + this.configKey = configKey; + return this; + } + + public Path getLockDirPath() { + return lockDirPath; + } + + public SpoutOutputCollector getCollector() { + return collector; + } + + public void nextTuple() { + LOG.trace("Next Tuple {}", spoutId); + // 1) First re-emit any previously failed tuples (from retryList) + if (!retryList.isEmpty()) { + LOG.debug("Sending tuple from retry list"); + HdfsUtils.Pair<MessageId, List<Object>> pair = retryList.remove(); + emitData(pair.getValue(), pair.getKey()); + return; + } + + if( ackEnabled && tracker.size()>= maxOutstanding) { + LOG.warn("Waiting for more ACKs before generating new tuples. " + + "Progress tracker size has reached limit {}, SpoutID {}" + , maxOutstanding, spoutId); + // Don't emit anything .. allow configured spout wait strategy to kick in + return; + } + + // 2) If no failed tuples to be retried, then send tuples from hdfs + while (true) { + try { + // 3) Select a new file if one is not open already + if (reader == null) { + reader = pickNextFile(); + if (reader == null) { + LOG.debug("Currently no new files to process under : " + sourceDirPath); + return; + } else { + fileReadCompletely=false; + } + } + if( fileReadCompletely ) { // wait for more ACKs before proceeding + return; + } + // 4) Read record from file, emit to collector and record progress + List<Object> tuple = reader.next(); + if (tuple != null) { + fileReadCompletely= false; + ++tupleCounter; + MessageId msgId = new MessageId(tupleCounter, reader.getFilePath(), reader.getFileOffset()); + emitData(tuple, msgId); + + if(!ackEnabled) { + ++acksSinceLastCommit; // assume message is immediately ACKed in non-ack mode + commitProgress(reader.getFileOffset()); + } else { + commitProgress(tracker.getCommitPosition()); + } + return; + } else { + fileReadCompletely = true; + if(!ackEnabled) { + markFileAsDone(reader.getFilePath()); + } + } + } catch (IOException e) { + LOG.error("I/O Error processing at file location " + getFileProgress(reader), e); + // don't emit anything .. allow configured spout wait strategy to kick in + return; + } catch (ParseException e) { + LOG.error("Parsing error when processing at file location " + getFileProgress(reader) + + ". Skipping remainder of file.", e); + markFileAsBad(reader.getFilePath()); + // Note: We don't return from this method on ParseException to avoid triggering the + // spout wait strategy (due to no emits). Instead we go back into the loop and + // generate a tuple from next file + } + } // while + } + + // will commit progress into lock file if commit threshold is reached + private void commitProgress(FileOffset position) { + if(position==null) { + return; + } + if ( lock!=null && canCommitNow() ) { + try { + String pos = position.toString(); + lock.heartbeat(pos); + LOG.debug("{} Committed progress. {}", spoutId, pos); + acksSinceLastCommit = 0; + commitTimeElapsed.set(false); + setupCommitElapseTimer(); + } catch (IOException e) { + LOG.error("Unable to commit progress Will retry later. Spout ID = " + spoutId, e); + } + } + } + + private void setupCommitElapseTimer() { + if(commitFrequencySec<=0) { + return; + } + TimerTask timerTask = new TimerTask() { + @Override + public void run() { + commitTimeElapsed.set(true); + } + }; + commitTimer.schedule(timerTask, commitFrequencySec * 1000); + } + + private static String getFileProgress(FileReader reader) { + return reader.getFilePath() + " " + reader.getFileOffset(); + } + + private void markFileAsDone(Path filePath) { + try { + Path newFile = renameCompletedFile(reader.getFilePath()); + LOG.info("Completed processing {}. Spout Id = {}", newFile, spoutId); + } catch (IOException e) { + LOG.error("Unable to archive completed file" + filePath + " Spout ID " + spoutId, e); + } + closeReaderAndResetTrackers(); + } + + private void markFileAsBad(Path file) { + String fileName = file.toString(); + String fileNameMinusSuffix = fileName.substring(0, fileName.indexOf(inprogress_suffix)); + String originalName = new Path(fileNameMinusSuffix).getName(); + Path newFile = new Path( badFilesDirPath + Path.SEPARATOR + originalName); + + LOG.info("Moving bad file {} to {}. Processed it till offset {}. SpoutID= {}", originalName, newFile, tracker.getCommitPosition(), spoutId); + try { + if (!hdfs.rename(file, newFile) ) { // seems this can fail by returning false or throwing exception + throw new IOException("Move failed for bad file: " + file); // convert false ret value to exception + } + } catch (IOException e) { + LOG.warn("Error moving bad file: " + file + " to destination " + newFile + " SpoutId =" + spoutId, e); + } + closeReaderAndResetTrackers(); + } + + private void closeReaderAndResetTrackers() { + inflight.clear(); + tracker.offsets.clear(); + retryList.clear(); + + reader.close(); + reader = null; + releaseLockAndLog(lock, spoutId); + lock = null; + } + + private static void releaseLockAndLog(FileLock fLock, String spoutId) { + try { + if(fLock!=null) { + fLock.release(); + LOG.debug("Spout {} released FileLock. SpoutId = {}", fLock.getLockFile(), spoutId); + } + } catch (IOException e) { + LOG.error("Unable to delete lock file : " +fLock.getLockFile() + " SpoutId =" + spoutId, e); + } + } + + protected void emitData(List<Object> tuple, MessageId id) { + LOG.trace("Emitting - {}", id); + this.collector.emit(tuple, id); + inflight.put(id, tuple); + } + + public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { + LOG.info("Opening HDFS Spout"); + this.conf = conf; + this.commitTimer = new Timer(); + this.tracker = new ProgressTracker(); + this.hdfsConfig = new Configuration(); + + this.collector = collector; + this.hdfsConfig = new Configuration(); + this.tupleCounter = 0; + + // Hdfs related settings + if( conf.containsKey(Configs.HDFS_URI)) { + this.hdfsUri = conf.get(Configs.HDFS_URI).toString(); + } else { + throw new RuntimeException(Configs.HDFS_URI + " setting is required"); + } + + try { + this.hdfs = FileSystem.get(URI.create(hdfsUri), hdfsConfig); + } catch (IOException e) { + LOG.error("Unable to instantiate file system", e); + throw new RuntimeException("Unable to instantiate file system", e); + } + + + if ( conf.containsKey(configKey) ) { + Map<String, Object> map = (Map<String, Object>)conf.get(configKey); + if(map != null) { + for(String keyName : map.keySet()){ + LOG.info("HDFS Config override : " + keyName + " = " + String.valueOf(map.get(keyName))); --- End diff -- minor nit, can we use "{}" for variable substitution. > Create HDFS Spout > ----------------- > > Key: STORM-1199 > URL: https://issues.apache.org/jira/browse/STORM-1199 > Project: Apache Storm > Issue Type: New Feature > Reporter: Roshan Naik > Assignee: Roshan Naik > Attachments: HDFSSpoutforStorm v2.pdf, HDFSSpoutforStorm.pdf, > hdfs-spout.1.patch > > > Create an HDFS spout so that Storm can suck in data from files in a HDFS > directory -- This message was sent by Atlassian JIRA (v6.3.4#6332)