[ https://issues.apache.org/jira/browse/STORM-1199?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15057680#comment-15057680 ]
ASF GitHub Bot commented on STORM-1199: --------------------------------------- Github user satishd commented on a diff in the pull request: https://github.com/apache/storm/pull/936#discussion_r47612027 --- Diff: external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java --- @@ -0,0 +1,654 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.hdfs.spout; + +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; + +import backtype.storm.Config; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.storm.hdfs.common.HdfsUtils; +import org.apache.storm.hdfs.common.security.HdfsSecurityUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import backtype.storm.spout.SpoutOutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.topology.base.BaseRichSpout; +import backtype.storm.tuple.Fields; + +public class HdfsSpout extends BaseRichSpout { + + private static final Logger LOG = LoggerFactory.getLogger(HdfsSpout.class); + + private Path sourceDirPath; + private Path archiveDirPath; + private Path badFilesDirPath; + private Path lockDirPath; + + private int commitFrequencyCount = Configs.DEFAULT_COMMIT_FREQ_COUNT; + private int commitFrequencySec = Configs.DEFAULT_COMMIT_FREQ_SEC; + private int maxDuplicates = Configs.DEFAULT_MAX_DUPLICATES; + private int lockTimeoutSec = Configs.DEFAULT_LOCK_TIMEOUT; + private boolean clocksInSync = true; + + private ProgressTracker tracker = new ProgressTracker(); + + private FileSystem hdfs; + private FileReader reader; + + private SpoutOutputCollector collector; + HashMap<MessageId, List<Object> > inflight = new HashMap<>(); + LinkedBlockingQueue<HdfsUtils.Pair<MessageId, List<Object>>> retryList = new LinkedBlockingQueue<>(); + + private String inprogress_suffix = ".inprogress"; + private String ignoreSuffix = ".ignore"; + + private Configuration hdfsConfig; + private String readerType; + + private Map conf = null; + private FileLock lock; + private String spoutId = null; + + HdfsUtils.Pair<Path,FileLock.LogEntry> lastExpiredLock = null; + private long lastExpiredLockTime = 0; + + private long tupleCounter = 0; + private boolean ackEnabled = false; + private int acksSinceLastCommit = 0 ; + private final AtomicBoolean commitTimeElapsed = new AtomicBoolean(false); + private final Timer commitTimer = new Timer(); + private boolean fileReadCompletely = false; + + private String configKey = Configs.DEFAULT_HDFS_CONFIG_KEY; // key for hdfs kerberos configs + + public HdfsSpout() { + } + + public Path getLockDirPath() { + return lockDirPath; + } + + public SpoutOutputCollector getCollector() { + return collector; + } + + public HdfsSpout withConfigKey(String configKey){ + this.configKey = configKey; + return this; + } + + public void nextTuple() { + LOG.debug("Next Tuple"); + // 1) First re-emit any previously failed tuples (from retryList) + if (!retryList.isEmpty()) { + LOG.debug("Sending from retry list"); + HdfsUtils.Pair<MessageId, List<Object>> pair = retryList.remove(); + emitData(pair.getValue(), pair.getKey()); + return; + } + + if( ackEnabled && tracker.size()>=maxDuplicates ) { + LOG.warn("Waiting for more ACKs before generating new tuples. " + + "Progress tracker size has reached limit {}" + , maxDuplicates); + // Don't emit anything .. allow configured spout wait strategy to kick in + return; + } + + // 2) If no failed tuples, then send tuples from hdfs + while (true) { + try { + // 3) Select a new file if one is not open already + if (reader == null) { + reader = pickNextFile(); + if (reader == null) { + LOG.info("Currently no new files to process under : " + sourceDirPath); + return; + } + } + + // 4) Read record from file, emit to collector and record progress + List<Object> tuple = reader.next(); + if (tuple != null) { + fileReadCompletely= false; + ++tupleCounter; + MessageId msgId = new MessageId(tupleCounter, reader.getFilePath(), reader.getFileOffset()); + emitData(tuple, msgId); + + if(!ackEnabled) { + ++acksSinceLastCommit; // assume message is immediately acked in non-ack mode + commitProgress(reader.getFileOffset()); + } else { + commitProgress(tracker.getCommitPosition()); + } + return; + } else { + fileReadCompletely = true; + if(!ackEnabled) { + markFileAsDone(reader.getFilePath()); + } + } + } catch (IOException e) { + LOG.error("I/O Error processing at file location " + getFileProgress(reader), e); + // don't emit anything .. allow configured spout wait strategy to kick in + return; + } catch (ParseException e) { + LOG.error("Parsing error when processing at file location " + getFileProgress(reader) + + ". Skipping remainder of file.", e); + markFileAsBad(reader.getFilePath()); + // note: Unfortunately not emitting anything here due to parse error + // will trigger the configured spout wait strategy which is unnecessary + } + } + + } + + // will commit progress into lock file if commit threshold is reached + private void commitProgress(FileOffset position) { + if ( lock!=null && canCommitNow() ) { + try { + lock.heartbeat(position.toString()); --- End diff -- position parameter can be null, it should be handled. > Create HDFS Spout > ----------------- > > Key: STORM-1199 > URL: https://issues.apache.org/jira/browse/STORM-1199 > Project: Apache Storm > Issue Type: New Feature > Reporter: Roshan Naik > Assignee: Roshan Naik > Attachments: HDFSSpoutforStorm v2.pdf, HDFSSpoutforStorm.pdf, > hdfs-spout.1.patch > > > Create an HDFS spout so that Storm can suck in data from files in a HDFS > directory -- This message was sent by Atlassian JIRA (v6.3.4#6332)