srkukarni closed pull request #2409: Added HDFS Sink
URL: https://github.com/apache/incubator-pulsar/pull/2409
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pom.xml b/pom.xml
index 681f18e931..5393d227ce 100644
--- a/pom.xml
+++ b/pom.xml
@@ -494,12 +494,6 @@ flexible messaging model and an intuitive client 
API.</description>
         <version>${log4j2.version}</version>
       </dependency>
 
-      <dependency>
-        <groupId>commons-io</groupId>
-        <artifactId>commons-io</artifactId>
-        <version>2.5</version>
-      </dependency>
-
       <dependency>
         <groupId>commons-codec</groupId>
         <artifactId>commons-codec</artifactId>
@@ -1345,6 +1339,50 @@ flexible messaging model and an intuitive client 
API.</description>
     <profile>
       <id>docker</id>
     </profile>
+    
+    <profile>
+      <!-- Checks style and licensing requirements. This is a good
+           idea to run for contributions and for the release process. While it 
would
+           be nice to run always these plugins can considerably slow the build 
and have
+           proven to create unstable builds in our multi-module project and 
when building
+           using multiple threads. The stability issues seen with Checkstyle 
in multi-module
+           builds include false-positives and false negatives. -->
+      <id>contrib-check</id>
+         <build>
+            <plugins>
+                <plugin>
+                    <groupId>org.apache.rat</groupId>
+                    <artifactId>apache-rat-plugin</artifactId>
+                    <executions>
+                      <execution>
+                          <goals>
+                              <goal>check</goal>
+                          </goals>
+                          <phase>verify</phase>
+                      </execution>
+                   </executions>
+                </plugin>
+                <plugin>
+                    <groupId>org.apache.maven.plugins</groupId>
+                    <artifactId>maven-checkstyle-plugin</artifactId>
+                    <executions>
+                       <execution>
+                          <id>check-style</id>
+                          <phase>verify</phase>
+                          <configuration>
+                                                 
<configLocation>./buildtools/src/main/resources/pulsar/checkstyle.xml</configLocation>
+                                                 
<suppressionsLocation>/buildtools/src/main/resources/pulsar/suppressions.xml</suppressionsLocation>
+                              <encoding>UTF-8</encoding>
+                          </configuration>
+                          <goals>
+                             <goal>check</goal>
+                          </goals>
+                       </execution>
+                    </executions>
+                </plugin>
+             </plugins>
+         </build>
+    </profile>
   </profiles>
 
   <repositories>
diff --git a/pulsar-io/hdfs/pom.xml b/pulsar-io/hdfs/pom.xml
new file mode 100644
index 0000000000..0d552077d5
--- /dev/null
+++ b/pulsar-io/hdfs/pom.xml
@@ -0,0 +1,69 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.pulsar</groupId>
+    <artifactId>pulsar-io</artifactId>
+    <version>2.2.0-incubating-SNAPSHOT</version>
+  </parent>
+  <artifactId>pulsar-io-hdfs</artifactId>
+  
+  <dependencies>
+     <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-io-core</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-databind</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.fasterxml.jackson.dataformat</groupId>
+      <artifactId>jackson-dataformat-yaml</artifactId>
+    </dependency>
+    
+       <dependency>
+               <groupId>org.apache.hadoop</groupId>
+               <artifactId>hadoop-client</artifactId>
+               <version>3.1.1</version>
+       </dependency>
+
+       <dependency>
+               <groupId>org.testng</groupId>
+               <artifactId>testng</artifactId>
+               <scope>test</scope>
+       </dependency>
+  </dependencies>
+  
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-nar-maven-plugin</artifactId>
+      </plugin>
+    </plugins>
+  </build>
+  
+</project>
\ No newline at end of file
diff --git 
a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/AbstractHdfsConfig.java
 
b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/AbstractHdfsConfig.java
new file mode 100644
index 0000000000..529c350950
--- /dev/null
+++ 
b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/AbstractHdfsConfig.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.hdfs;
+
+import java.io.Serializable;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import lombok.experimental.Accessors;
+
+import org.apache.commons.lang.StringUtils;
+
+/**
+ * Configuration object for all HDFS components.
+ */
+@Data
+@Setter
+@Getter
+@EqualsAndHashCode
+@ToString
+@Accessors(chain = true)
+public abstract class AbstractHdfsConfig implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * A file or comma separated list of files which contains the Hadoop file 
system configuration,
+     * e.g. 'core-site.xml', 'hdfs-site.xml'.
+     */
+    private String hdfsConfigResources;
+
+    /**
+     * The HDFS directory from which files should be read from or written to.
+     */
+    private String directory;
+
+    /**
+     * The character encoding for the files, e.g. UTF-8, ASCII, etc.
+     */
+    private String encoding;
+
+    /**
+     * The compression codec used to compress/de-compress the files on HDFS.
+     */
+    private Compression compression;
+
+    /**
+     * The Kerberos user principal account to use for authentication.
+     */
+    private String kerberosUserPrincipal;
+
+    /**
+     * The full pathname to the Kerberos keytab file to use for authentication.
+     */
+    private String keytab;
+
+    public void validate() {
+        if (StringUtils.isEmpty(hdfsConfigResources) || 
StringUtils.isEmpty(directory)) {
+           throw new IllegalArgumentException("Required property not set.");
+        }
+
+        if ((StringUtils.isNotEmpty(kerberosUserPrincipal) && 
StringUtils.isEmpty(keytab))
+            || (StringUtils.isEmpty(kerberosUserPrincipal) && 
StringUtils.isNotEmpty(keytab))) {
+          throw new IllegalArgumentException("Values for both 
kerberosUserPrincipal & keytab are required.");
+        }
+    }
+}
diff --git 
a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/AbstractHdfsConnector.java
 
b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/AbstractHdfsConnector.java
new file mode 100644
index 0000000000..0eccd93dc7
--- /dev/null
+++ 
b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/AbstractHdfsConnector.java
@@ -0,0 +1,258 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.hdfs;
+
+import java.io.IOException;
+import java.lang.ref.WeakReference;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.URI;
+import java.nio.charset.Charset;
+import java.security.PrivilegedExceptionAction;
+import java.util.Collections;
+import java.util.Map;
+import java.util.WeakHashMap;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.net.SocketFactory;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.pulsar.io.hdfs.sink.HdfsSinkConfig;
+
+/**
+ * A Simple abstract class for HDFS connectors.
+ * Provides methods for connecting to HDFS
+ */
+public abstract class AbstractHdfsConnector {
+
+    private static final Object RESOURCES_LOCK = new Object();
+
+    // Hadoop Configuration, Filesystem, and UserGroupInformation (optional)
+    protected final AtomicReference<HdfsResources> hdfsResources = new 
AtomicReference<>();
+    protected AbstractHdfsConfig connectorConfig;
+    protected CompressionCodecFactory compressionCodecFactory;
+
+    public AbstractHdfsConnector() {
+       hdfsResources.set(new HdfsResources(null, null, null));
+    }
+
+    /*
+     * Reset Hadoop Configuration and FileSystem based on the supplied 
configuration resources.
+     */
+    protected HdfsResources resetHDFSResources(HdfsSinkConfig hdfsSinkConfig) 
throws IOException {
+        Configuration config = new ExtendedConfiguration();
+        config.setClassLoader(Thread.currentThread().getContextClassLoader());
+
+        getConfig(config, connectorConfig.getHdfsConfigResources());
+
+        // first check for timeout on HDFS connection, because FileSystem has 
a hard coded 15 minute timeout
+        checkHdfsUriForTimeout(config);
+
+        /* Disable caching of Configuration and FileSystem objects, else we 
cannot reconfigure
+         * the processor without a complete restart
+         */
+        String disableCacheName = String.format("fs.%s.impl.disable.cache",
+                FileSystem.getDefaultUri(config).getScheme());
+        config.set(disableCacheName, "true");
+
+        // If kerberos is enabled, create the file system as the kerberos 
principal
+        // -- use RESOURCE_LOCK to guarantee UserGroupInformation is accessed 
by only a single thread at at time
+        FileSystem fs;
+        UserGroupInformation ugi;
+        synchronized (RESOURCES_LOCK) {
+            if (SecurityUtil.isSecurityEnabled(config)) {
+                ugi = SecurityUtil.loginKerberos(config,
+                        connectorConfig.getKerberosUserPrincipal(), 
connectorConfig.getKeytab());
+                fs = getFileSystemAsUser(config, ugi);
+            } else {
+                config.set("ipc.client.fallback-to-simple-auth-allowed", 
"true");
+                config.set("hadoop.security.authentication", "simple");
+                ugi = SecurityUtil.loginSimple(config);
+                fs = getFileSystemAsUser(config, ugi);
+            }
+        }
+        return new HdfsResources(config, fs, ugi);
+    }
+
+    private static Configuration getConfig(final Configuration config, String 
res) throws IOException {
+        boolean foundResources = false;
+        if (null != res) {
+            String[] resources = res.split(",");
+            for (String resource : resources) {
+                config.addResource(new Path(resource.trim()));
+                foundResources = true;
+            }
+        }
+
+        if (!foundResources) {
+            // check that at least 1 non-default resource is available on the 
classpath
+            String configStr = config.toString();
+            for (String resource : configStr.substring(configStr.indexOf(":") 
+ 1).split(",")) {
+                if (!resource.contains("default") && 
config.getResource(resource.trim()) != null) {
+                    foundResources = true;
+                    break;
+                }
+            }
+        }
+
+        if (!foundResources) {
+            throw new IOException("Could not find any of the " + res + " on 
the classpath");
+        }
+        return config;
+    }
+
+    /*
+     * Reduce the timeout of a socket connection from the default in 
FileSystem.get()
+     */
+    protected void checkHdfsUriForTimeout(Configuration config) throws 
IOException {
+        URI hdfsUri = FileSystem.getDefaultUri(config);
+        String address = hdfsUri.getAuthority();
+        int port = hdfsUri.getPort();
+        if (address == null || address.isEmpty() || port < 0) {
+            return;
+        }
+        InetSocketAddress namenode = NetUtils.createSocketAddr(address, port);
+        SocketFactory socketFactory = NetUtils.getDefaultSocketFactory(config);
+        Socket socket = null;
+        try {
+            socket = socketFactory.createSocket();
+            NetUtils.connect(socket, namenode, 1000); // 1 second timeout
+        } finally {
+            IOUtils.closeQuietly(socket);
+        }
+    }
+
+    /**
+     * This exists in order to allow unit tests to override it so that they 
don't take several
+     * minutes waiting for UDP packets to be received.
+     *
+     * @param config
+     *            the configuration to use
+     * @return the FileSystem that is created for the given Configuration
+     * @throws IOException
+     *             if unable to create the FileSystem
+     */
+    protected FileSystem getFileSystem(final Configuration config) throws 
IOException {
+        return FileSystem.get(config);
+    }
+
+    protected FileSystem getFileSystemAsUser(final Configuration config, 
UserGroupInformation ugi) throws IOException {
+        try {
+            return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
+                @Override
+                public FileSystem run() throws Exception {
+                    return FileSystem.get(config);
+                }
+            });
+        } catch (InterruptedException e) {
+            throw new IOException("Unable to create file system: " + 
e.getMessage());
+        }
+    }
+
+    protected Configuration getConfiguration() {
+        return hdfsResources.get().getConfiguration();
+    }
+
+    protected FileSystem getFileSystem() {
+        return hdfsResources.get().getFileSystem();
+    }
+
+    protected UserGroupInformation getUserGroupInformation() {
+        return hdfsResources.get().getUserGroupInformation();
+    }
+
+    protected String getEncoding() {
+        return StringUtils.isNotBlank(connectorConfig.getEncoding())
+                   ? connectorConfig.getEncoding() : 
Charset.defaultCharset().name();
+    }
+
+    protected CompressionCodec getCompressionCodec() {
+       if (connectorConfig.getCompression() == null) {
+           return null;
+       }
+
+       CompressionCodec codec = getCompressionCodecFactory()
+               .getCodecByName(connectorConfig.getCompression().name());
+
+       return (codec != null) ? codec : new DefaultCodec();
+    }
+
+    protected CompressionCodecFactory getCompressionCodecFactory() {
+        if (compressionCodecFactory == null) {
+            compressionCodecFactory = new 
CompressionCodecFactory(getConfiguration());
+        }
+
+        return compressionCodecFactory;
+    }
+
+    /**
+     * Extending Hadoop Configuration to prevent it from caching classes that 
can't be found. Since users may be
+     * adding additional JARs to the classpath we don't want them to have to 
restart the JVM to be able to load
+     * something that was previously not found, but might now be available.
+     * Reference the original getClassByNameOrNull from Configuration.
+     */
+    static class ExtendedConfiguration extends Configuration {
+
+        private final Map<ClassLoader, Map<String, WeakReference<Class<?>>>> 
cacheClasses = new WeakHashMap<>();
+
+        @Override
+        public Class<?> getClassByNameOrNull(String name) {
+            final ClassLoader classLoader = getClassLoader();
+
+            Map<String, WeakReference<Class<?>>> map;
+            synchronized (cacheClasses) {
+                map = cacheClasses.get(classLoader);
+                if (map == null) {
+                    map = Collections.synchronizedMap(new WeakHashMap<>());
+                    cacheClasses.put(classLoader, map);
+                }
+            }
+
+            Class<?> clazz = null;
+            WeakReference<Class<?>> ref = map.get(name);
+            if (ref != null) {
+                clazz = ref.get();
+            }
+
+            if (clazz == null) {
+                try {
+                    clazz = Class.forName(name, true, classLoader);
+                } catch (ClassNotFoundException e) {
+                    return null;
+                }
+                // two putters can race here, but they'll put the same class
+                map.put(name, new WeakReference<>(clazz));
+                return clazz;
+            } else {
+                // cache hit
+                return clazz;
+            }
+        }
+
+    }
+}
diff --git 
a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/Compression.java 
b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/Compression.java
new file mode 100644
index 0000000000..97dba5321d
--- /dev/null
+++ b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/Compression.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.hdfs;
+
+/**
+ * An enumeration of compression codecs available for HDFS.
+ */
+public enum Compression {
+    BZIP2, DEFLATE, GZIP, LZ4, SNAPPY
+}
diff --git 
a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/HdfsResources.java 
b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/HdfsResources.java
new file mode 100644
index 0000000000..1d04c6cc17
--- /dev/null
+++ b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/HdfsResources.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.hdfs;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.security.UserGroupInformation;
+
+/**
+ * A wrapper class for HDFS resources.
+ */
+public class HdfsResources {
+
+    private final Configuration configuration;
+    private final FileSystem fileSystem;
+    private final UserGroupInformation userGroupInformation;
+
+    public HdfsResources(Configuration config, FileSystem fs, 
UserGroupInformation ugi) {
+        this.configuration = config;
+        this.fileSystem = fs;
+        this.userGroupInformation = ugi;
+    }
+
+    public Configuration getConfiguration() {
+        return configuration;
+    }
+
+    public FileSystem getFileSystem() {
+        return fileSystem;
+    }
+
+    public UserGroupInformation getUserGroupInformation() {
+        return userGroupInformation;
+    }
+}
diff --git 
a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/SecurityUtil.java 
b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/SecurityUtil.java
new file mode 100644
index 0000000000..c5462d3d09
--- /dev/null
+++ b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/SecurityUtil.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.hdfs;
+
+import java.io.IOException;
+
+import org.apache.commons.lang3.Validate;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+
+/**
+ * Provides synchronized access to UserGroupInformation to avoid multiple 
processors/services from
+ * interfering with each other.
+ */
+public class SecurityUtil {
+    public static final String HADOOP_SECURITY_AUTHENTICATION = 
"hadoop.security.authentication";
+    public static final String KERBEROS = "kerberos";
+
+    /**
+     *  Initializes UserGroupInformation with the given Configuration and 
performs the login for the
+     *  given principal and keytab. All logins should happen through this 
class to ensure other threads
+     *  are not concurrently modifying UserGroupInformation.
+     * <p/>
+     * @param config the configuration instance
+     * @param principal the principal to authenticate as
+     * @param keyTab the keytab to authenticate with
+     *
+     * @return the UGI for the given principal
+     *
+     * @throws IOException if login failed
+     */
+    public static synchronized UserGroupInformation loginKerberos(final 
Configuration config,
+            final String principal, final String keyTab) throws IOException {
+        Validate.notNull(config);
+        Validate.notNull(principal);
+        Validate.notNull(keyTab);
+
+        UserGroupInformation.setConfiguration(config);
+        UserGroupInformation.loginUserFromKeytab(principal.trim(), 
keyTab.trim());
+        return UserGroupInformation.getCurrentUser();
+    }
+
+    /**
+     * Initializes UserGroupInformation with the given Configuration and
+     * returns UserGroupInformation.getLoginUser(). All logins should happen
+     * through this class to ensure other threads are not concurrently
+     * modifying UserGroupInformation.
+     *
+     * @param config the configuration instance
+     *
+     * @return the UGI for the given principal
+     *
+     * @throws IOException if login failed
+     */
+    public static synchronized UserGroupInformation loginSimple(final 
Configuration config) throws IOException {
+        Validate.notNull(config);
+        UserGroupInformation.setConfiguration(config);
+        return UserGroupInformation.getLoginUser();
+    }
+
+    /**
+     * Initializes UserGroupInformation with the given Configuration and 
returns
+     * UserGroupInformation.isSecurityEnabled().
+     * All checks for isSecurityEnabled() should happen through this method.
+     *
+     * @param config the given configuration
+     *
+     * @return true if kerberos is enabled on the given configuration, false 
otherwise
+     *
+     */
+    public static boolean isSecurityEnabled(final Configuration config) {
+        Validate.notNull(config);
+        return 
KERBEROS.equalsIgnoreCase(config.get(HADOOP_SECURITY_AUTHENTICATION));
+    }
+}
diff --git 
a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/package-info.java 
b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/package-info.java
new file mode 100644
index 0000000000..42948521f3
--- /dev/null
+++ b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/package-info.java
@@ -0,0 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.hdfs;
\ No newline at end of file
diff --git 
a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/HdfsAbstractSink.java
 
b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/HdfsAbstractSink.java
new file mode 100644
index 0000000000..18184e238c
--- /dev/null
+++ 
b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/HdfsAbstractSink.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.hdfs.sink;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.commons.io.FilenameUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FSDataOutputStreamBuilder;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.KeyValue;
+import org.apache.pulsar.io.core.Sink;
+import org.apache.pulsar.io.core.SinkContext;
+import org.apache.pulsar.io.hdfs.AbstractHdfsConnector;
+import org.apache.pulsar.io.hdfs.HdfsResources;
+
+/**
+ * A Simple abstract class for HDFS sink.
+ * Users need to implement extractKeyValue function to use this sink.
+ */
+public abstract class HdfsAbstractSink<K, V> extends AbstractHdfsConnector 
implements Sink<V> {
+
+    protected HdfsSinkConfig hdfsSinkConfig;
+    protected BlockingQueue<Record<V>> unackedRecords;
+    protected HdfsSyncThread<V> syncThread;
+    private Path path;
+    private FSDataOutputStream hdfsStream;
+
+    public abstract KeyValue<K, V> extractKeyValue(Record<V> record);
+    protected abstract void createWriter() throws IOException;
+
+    @Override
+    public void open(Map<String, Object> config, SinkContext sinkContext) 
throws Exception {
+       hdfsSinkConfig = HdfsSinkConfig.load(config);
+       hdfsSinkConfig.validate();
+       connectorConfig = hdfsSinkConfig;
+       unackedRecords = new LinkedBlockingQueue<Record<V>> 
(hdfsSinkConfig.getMaxPendingRecords());
+       connectToHdfs();
+       createWriter();
+       launchSyncThread();
+    }
+
+    @Override
+    public void close() throws Exception {
+       syncThread.halt();
+       syncThread.join(0);
+    }
+
+    protected final void connectToHdfs() throws IOException {
+       try {
+           HdfsResources resources = hdfsResources.get();
+
+           if (resources.getConfiguration() == null) {
+               resources = this.resetHDFSResources(hdfsSinkConfig);
+               hdfsResources.set(resources);
+           }
+       } catch (IOException ex) {
+          hdfsResources.set(new HdfsResources(null, null, null));
+          throw ex;
+       }
+    }
+
+    @SuppressWarnings("rawtypes")
+    protected final FSDataOutputStreamBuilder getOutputStreamBuilder() throws 
IOException {
+        Path path = getPath();
+        FileSystem fs = getFileSystemAsUser(getConfiguration(), 
getUserGroupInformation());
+        FSDataOutputStreamBuilder builder = fs.exists(path) ? 
fs.appendFile(path) :  fs.createFile(path);
+        return builder.recursive().permission(new FsPermission(FsAction.ALL, 
FsAction.ALL, FsAction.ALL));
+    }
+
+    protected FSDataOutputStream getHdfsStream() throws 
IllegalArgumentException, IOException {
+        if (hdfsStream == null) {
+            hdfsStream = getOutputStreamBuilder().build();
+        }
+        return hdfsStream;
+    }
+
+    protected final Path getPath() {
+        if (path == null) {
+            String ext = "";
+            if (StringUtils.isNotBlank(hdfsSinkConfig.getFileExtension())) {
+                ext = hdfsSinkConfig.getFileExtension();
+            } else if (getCompressionCodec() != null) {
+                ext = getCompressionCodec().getDefaultExtension();
+            }
+
+            path = new Path(FilenameUtils.concat(hdfsSinkConfig.getDirectory(),
+                    hdfsSinkConfig.getFilenamePrefix() + "-" + 
System.currentTimeMillis() + ext));
+        }
+        return path;
+    }
+
+    protected final void launchSyncThread() throws IOException {
+        syncThread = new HdfsSyncThread<V>(getHdfsStream(), unackedRecords, 
hdfsSinkConfig.getSyncInterval());
+        syncThread.start();
+    }
+}
\ No newline at end of file
diff --git 
a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/HdfsSinkConfig.java
 
b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/HdfsSinkConfig.java
new file mode 100644
index 0000000000..e9f4cae5b7
--- /dev/null
+++ 
b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/HdfsSinkConfig.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.hdfs.sink;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Map;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import lombok.experimental.Accessors;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.pulsar.io.hdfs.AbstractHdfsConfig;
+
+/**
+ * Configuration object for all HDFS Sink components.
+ */
+@Data
+@Setter
+@Getter
+@EqualsAndHashCode(callSuper = false)
+@ToString
+@Accessors(chain = true)
+public class HdfsSinkConfig extends AbstractHdfsConfig implements Serializable 
{
+
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * The prefix of the files to create inside the HDFS directory, i.e. a 
value of "topicA"
+     * will result in files named topicA-, topicA-, etc being produced
+     */
+    private String filenamePrefix;
+
+    /**
+     * The extension to add to the files written to HDFS, e.g. '.txt', '.seq', 
etc.
+     */
+    private String fileExtension;
+
+    /**
+     * The character to use to separate records in a text file. If no value is 
provided
+     * then the content from all of the records will be concatenated together 
in one continuous
+     * byte array.
+     */
+    private char separator;
+
+    /**
+     * The interval (in milliseconds) between calls to flush data to HDFS disk.
+     */
+    private long syncInterval;
+
+    /**
+     * The maximum number of records that we hold in memory before acking. 
Default is Integer.MAX_VALUE.
+     * Setting this value to one, results in every record being sent to disk 
before the record is acked,
+     * while setting it to a higher values allows us to buffer records before 
flushing them all to disk.
+     */
+    private int maxPendingRecords = Integer.MAX_VALUE;
+
+    public static HdfsSinkConfig load(String yamlFile) throws IOException {
+       ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
+       return mapper.readValue(new File(yamlFile), HdfsSinkConfig.class);
+    }
+
+    public static HdfsSinkConfig load(Map<String, Object> map) throws 
IOException {
+       ObjectMapper mapper = new ObjectMapper();
+       return mapper.readValue(new ObjectMapper().writeValueAsString(map), 
HdfsSinkConfig.class);
+    }
+
+    @Override
+    public void validate() {
+        super.validate();
+        if ((StringUtils.isEmpty(fileExtension) && getCompression() == null)
+            || StringUtils.isEmpty(filenamePrefix)) {
+           throw new IllegalArgumentException("Required property not set.");
+        }
+
+        if (syncInterval < 0) {
+          throw new IllegalArgumentException("Sync Interval cannot be 
negative");
+        }
+
+        if (maxPendingRecords < 1) {
+          throw new IllegalArgumentException("Max Pending Records must be a 
positive integer");
+        }
+    }
+}
diff --git 
a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/HdfsSyncThread.java
 
b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/HdfsSyncThread.java
new file mode 100644
index 0000000000..3c19ed800f
--- /dev/null
+++ 
b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/HdfsSyncThread.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.hdfs.sink;
+
+import java.io.IOException;
+import java.util.concurrent.BlockingQueue;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.hadoop.fs.Syncable;
+import org.apache.pulsar.functions.api.Record;
+
+/**
+ * A thread that runs in the background and acknowledges Records
+ * after they have been written to disk.
+ *
+ * @param <V>
+ */
+public class HdfsSyncThread<V> extends Thread {
+
+    private final Syncable stream;
+    private final BlockingQueue<Record<V>> unackedRecords;
+    private final long syncInterval;
+    private boolean keepRunning = true;
+
+    public HdfsSyncThread(Syncable stream, BlockingQueue<Record<V>> 
unackedRecords, long syncInterval) {
+      this.stream = stream;
+      this.unackedRecords = unackedRecords;
+      this.syncInterval = syncInterval;
+    }
+
+    @Override
+    public void run() {
+       while (keepRunning) {
+         try {
+            Thread.sleep(syncInterval);
+            ackRecords();
+         } catch (InterruptedException e) {
+            return;
+         } catch (IOException e) {
+            e.printStackTrace();
+         }
+       }
+    }
+
+    public final void halt() throws IOException, InterruptedException {
+       keepRunning = false;
+       ackRecords();
+    }
+
+    private void ackRecords() throws IOException, InterruptedException {
+
+        if (CollectionUtils.isEmpty(unackedRecords)) {
+           return;
+        }
+
+        synchronized (stream) {
+          stream.hsync();
+        }
+
+        while (!unackedRecords.isEmpty()) {
+          unackedRecords.take().ack();
+        }
+    }
+}
diff --git 
a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/package-info.java 
b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/package-info.java
new file mode 100644
index 0000000000..c6506d98b8
--- /dev/null
+++ 
b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/package-info.java
@@ -0,0 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.hdfs.sink;
\ No newline at end of file
diff --git 
a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/seq/HdfsAbstractSequenceFileSink.java
 
b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/seq/HdfsAbstractSequenceFileSink.java
new file mode 100644
index 0000000000..7c61c2070b
--- /dev/null
+++ 
b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/seq/HdfsAbstractSequenceFileSink.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.hdfs.sink.seq;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.Writer;
+import org.apache.hadoop.io.SequenceFile.Writer.Option;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.KeyValue;
+import org.apache.pulsar.io.core.Sink;
+import org.apache.pulsar.io.hdfs.sink.HdfsAbstractSink;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * HDFS Sink that writes it contents to HDFS as Sequence Files.
+ *
+ * @param <K> - The incoming Key type
+ * @param <V> - The incoming Value type
+ * @param <HdfsK> - The HDFS Key type
+ * @param <HdfsV> - The HDFS Value type
+ */
+public abstract class HdfsAbstractSequenceFileSink<K, V, HdfsK, HdfsV>
+    extends HdfsAbstractSink<K, V> implements Sink<V> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(HdfsAbstractSequenceFileSink.class);
+
+    protected AtomicLong counter;
+    protected FSDataOutputStream hdfsStream;
+    protected Writer writer = null;
+
+    public abstract KeyValue<HdfsK, HdfsV> convert(KeyValue<K, V> kv);
+
+    @Override
+    public void close() throws Exception {
+       writer.close();
+       super.close();
+    }
+
+    @Override
+    protected void createWriter() throws IOException {
+       writer = getWriter();
+    }
+
+    @Override
+    public void write(Record<V> record) {
+       try {
+            KeyValue<K, V> kv = extractKeyValue(record);
+            KeyValue<HdfsK, HdfsV> keyValue = convert(kv);
+            writer.append(keyValue.getKey(), keyValue.getValue());
+            unackedRecords.put(record);
+        } catch (IOException | InterruptedException e) {
+            LOG.error("Unable to write to file " + getPath(), e);
+            record.fail();
+        }
+    }
+
+    protected Writer getWriter() throws IOException {
+        counter = new AtomicLong(0);
+        List<Option> options = getOptions();
+        return SequenceFile.createWriter(getConfiguration(),
+                options.toArray(new Option[options.size()]));
+     }
+
+    protected List<Option> getOptions() throws IllegalArgumentException, 
IOException {
+        List<Option> list = new ArrayList<Option>();
+        list.add(Writer.stream(getHdfsStream()));
+
+        if (getCompressionCodec() != null) {
+            list.add(Writer.compression(SequenceFile.CompressionType.RECORD, 
getCompressionCodec()));
+        }
+        return list;
+    }
+}
diff --git 
a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/seq/HdfsSequentialTextSink.java
 
b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/seq/HdfsSequentialTextSink.java
new file mode 100644
index 0000000000..84ce09f469
--- /dev/null
+++ 
b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/seq/HdfsSequentialTextSink.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.hdfs.sink.seq;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.Writer;
+import org.apache.hadoop.io.SequenceFile.Writer.Option;
+import org.apache.hadoop.io.Text;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.KeyValue;
+
+/**
+ * This Sink should be used when the records are originating from a sequential 
source,
+ * and we want to retain the record sequence.This class uses the record's 
sequence id as
+ * the sequence id in the HDFS Sequence File if it is available, if not a 
sequence id is
+ * auto-generated for each new record.
+ */
+public class HdfsSequentialTextSink extends HdfsAbstractSequenceFileSink<Long, 
String, LongWritable, Text> {
+
+    private AtomicLong counter;
+
+    @Override
+    public Writer getWriter() throws IOException {
+       counter = new AtomicLong(0);
+
+       return SequenceFile
+                .createWriter(
+                   getConfiguration(),
+                   getOptions().toArray(new Option[getOptions().size()]));
+    }
+
+    @Override
+    protected List<Option> getOptions() throws IllegalArgumentException, 
IOException {
+        List<Option> opts = super.getOptions();
+        opts.add(Writer.keyClass(LongWritable.class));
+        opts.add(Writer.valueClass(Text.class));
+        return opts;
+    }
+
+    @Override
+    public KeyValue<Long, String> extractKeyValue(Record<String> record) {
+       Long sequence = record.getRecordSequence().orElseGet(() -> new 
Long(counter.incrementAndGet()));
+       return new KeyValue<>(sequence, new String(record.getValue()));
+    }
+
+    @Override
+    public KeyValue<LongWritable, Text> convert(KeyValue<Long, String> kv) {
+       return new KeyValue<>(new LongWritable(kv.getKey()), new 
Text(kv.getValue()));
+    }
+}
diff --git 
a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/seq/HdfsTextSink.java
 
b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/seq/HdfsTextSink.java
new file mode 100644
index 0000000000..84ebc07c44
--- /dev/null
+++ 
b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/seq/HdfsTextSink.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.hdfs.sink.seq;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.io.SequenceFile.Writer;
+import org.apache.hadoop.io.SequenceFile.Writer.Option;
+import org.apache.hadoop.io.Text;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.KeyValue;
+
+/**
+ * A Simple Sink class for Hdfs Sequence File.
+ */
+public class HdfsTextSink extends
+     HdfsAbstractSequenceFileSink<String, String, Text, Text> {
+
+    @Override
+    protected List<Option> getOptions() throws IllegalArgumentException, 
IOException {
+        List<Option> opts = super.getOptions();
+        opts.add(Writer.keyClass(Text.class));
+        opts.add(Writer.valueClass(Text.class));
+        return opts;
+    }
+
+    @Override
+    public KeyValue<String, String> extractKeyValue(Record<String> record) {
+       String key = record.getKey().orElseGet(() -> new 
String(record.getValue()));
+       return new KeyValue<>(key, new String(record.getValue()));
+    }
+
+    @Override
+    public KeyValue<Text, Text> convert(KeyValue<String, String> kv) {
+       return new KeyValue<>(new Text(kv.getKey()), new Text(kv.getValue()));
+    }
+}
diff --git 
a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/seq/package-info.java
 
b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/seq/package-info.java
new file mode 100644
index 0000000000..025311ab1f
--- /dev/null
+++ 
b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/seq/package-info.java
@@ -0,0 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.hdfs.sink.seq;
\ No newline at end of file
diff --git 
a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/text/HdfsAbstractTextFileSink.java
 
b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/text/HdfsAbstractTextFileSink.java
new file mode 100644
index 0000000000..6fb50a5b17
--- /dev/null
+++ 
b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/text/HdfsAbstractTextFileSink.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.hdfs.sink.text;
+
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.KeyValue;
+import org.apache.pulsar.io.core.Sink;
+import org.apache.pulsar.io.hdfs.sink.HdfsAbstractSink;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Base class for HDFS Sinks that writes there contents to HDFS as Text Files.
+ *
+ * @param <K>
+ * @param <V>
+ */
+public abstract class HdfsAbstractTextFileSink<K, V> extends 
HdfsAbstractSink<K, V> implements Sink<V> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(HdfsAbstractTextFileSink.class);
+
+    protected OutputStreamWriter writer;
+
+    @Override
+    protected void createWriter() throws IOException {
+        writer = new OutputStreamWriter(new 
BufferedOutputStream(openHdfsStream()), getEncoding());
+    }
+
+    @Override
+    public void close() throws Exception {
+        writer.close();
+        super.close();
+    }
+
+    @Override
+    public void write(Record<V> record) {
+       try {
+           KeyValue<K, V> kv = extractKeyValue(record);
+           writer.write(kv.getValue().toString());
+
+           if (hdfsSinkConfig.getSeparator() != '\u0000') {
+              writer.write(hdfsSinkConfig.getSeparator());
+           }
+           unackedRecords.put(record);
+        } catch (IOException | InterruptedException e) {
+            LOG.error("Unable to write to file " + getPath(), e);
+            record.fail();
+        }
+    }
+
+    private OutputStream openHdfsStream() throws IOException {
+       if (hdfsSinkConfig.getCompression() != null) {
+          return getCompressionCodec().createOutputStream(getHdfsStream());
+       } else {
+          return getHdfsStream();
+       }
+    }
+}
\ No newline at end of file
diff --git 
a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/text/HdfsStringSink.java
 
b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/text/HdfsStringSink.java
new file mode 100644
index 0000000000..355c6df2db
--- /dev/null
+++ 
b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/text/HdfsStringSink.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.hdfs.sink.text;
+
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.KeyValue;
+import org.apache.pulsar.io.core.Sink;
+
+/**
+ * A Simple Sink class for Hdfs Text File.
+ */
+public class HdfsStringSink extends HdfsAbstractTextFileSink<String, String> 
implements Sink<String> {
+    @Override
+    public KeyValue<String, String> extractKeyValue(Record<String> record) {
+       String key = record.getKey().orElseGet(() -> new 
String(record.getValue()));
+       return new KeyValue<>(key, new String(record.getValue()));
+    }
+}
diff --git 
a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/text/package-info.java
 
b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/text/package-info.java
new file mode 100644
index 0000000000..9ade5707da
--- /dev/null
+++ 
b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/text/package-info.java
@@ -0,0 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.hdfs.sink.text;
\ No newline at end of file
diff --git a/pulsar-io/hdfs/src/main/resources/META-INF/services/pulsar-io.yaml 
b/pulsar-io/hdfs/src/main/resources/META-INF/services/pulsar-io.yaml
new file mode 100644
index 0000000000..f2a2b5572d
--- /dev/null
+++ b/pulsar-io/hdfs/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -0,0 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+name: hdfs
+description: Writes data into HDFS
+sinkClass: org.apache.pulsar.io.hdfs.sink.text.HdfsStringSink
diff --git 
a/pulsar-io/hdfs/src/test/java/org/apache/pulsar/io/hdfs/sink/AbstractHdfsSinkTest.java
 
b/pulsar-io/hdfs/src/test/java/org/apache/pulsar/io/hdfs/sink/AbstractHdfsSinkTest.java
new file mode 100644
index 0000000000..32085dd5dc
--- /dev/null
+++ 
b/pulsar-io/hdfs/src/test/java/org/apache/pulsar/io/hdfs/sink/AbstractHdfsSinkTest.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.hdfs.sink;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.SinkContext;
+import org.mockito.Mock;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.testng.annotations.BeforeMethod;
+
+/**
+ * Simple base class for all the HDFS sink test cases.
+ * Provides utility methods for sending records to the sink.
+ *
+ */
+public abstract class AbstractHdfsSinkTest<K, V> {
+    
+    @Mock
+    protected SinkContext mockSinkContext;
+    
+    @Mock
+    protected Record<V> mockRecord;
+    
+    protected Map<String, Object> map;
+    protected HdfsAbstractSink<K, V> sink;
+    
+    @SuppressWarnings("unchecked")
+    @BeforeMethod
+    public final void setUp() throws Exception {
+        map = new HashMap<String, Object> ();
+        map.put("hdfsConfigResources", 
"../incubator-pulsar/pulsar-io/hdfs/src/test/resources/hadoop/core-site.xml,"
+                + 
"../incubator-pulsar/pulsar-io/hdfs/src/test/resources/hadoop/hdfs-site.xml");
+        map.put("directory", "/tmp/testing");
+        map.put("filenamePrefix", "prefix");
+        
+        mockSinkContext = mock(SinkContext.class);
+        
+        mockRecord = mock(Record.class);
+        when(mockRecord.getRecordSequence()).thenAnswer(new 
Answer<Optional<Long>>() {
+          long sequenceCounter = 0;
+          public Optional<Long> answer(InvocationOnMock invocation) throws 
Throwable {
+             return Optional.of(sequenceCounter++);
+          }});
+        
+        when(mockRecord.getKey()).thenAnswer(new Answer<Optional<String>>() {
+            long sequenceCounter = 0;
+            public Optional<String> answer(InvocationOnMock invocation) throws 
Throwable {
+               return Optional.of( "key-" + sequenceCounter++);
+            }});
+        
+        when(mockRecord.getValue()).thenAnswer(new Answer<String>() {
+            long sequenceCounter = 0;
+            public String answer(InvocationOnMock invocation) throws Throwable 
{
+                 return new String( "value-" + sequenceCounter++ + "-" + 
UUID.randomUUID().toString());
+            }});
+        
+        createSink();
+    }
+
+    protected abstract void createSink();
+
+    protected final void send(int numRecords) throws Exception {
+        for (int idx = 0; idx < numRecords; idx++) {
+            sink.write(mockRecord);
+        }
+    }
+    
+    protected final void runFor(int numSeconds) throws InterruptedException {
+        Producer producer = new Producer();
+        producer.start();
+        Thread.sleep(numSeconds * 1000); // Run for N seconds
+        producer.halt();
+        producer.join(2000);
+    }
+    
+    protected final class Producer extends Thread {
+        public boolean keepRunning = true;
+        @Override
+        public void run() {
+            while (keepRunning)
+                try {
+                    sink.write(mockRecord);
+                } catch (Exception e) {
+                    // TODO Auto-generated catch block
+                    e.printStackTrace();
+                }
+        }
+        
+        public void halt() { 
+            keepRunning = false; 
+        }
+        
+    }
+}
diff --git 
a/pulsar-io/hdfs/src/test/java/org/apache/pulsar/io/hdfs/sink/HdfsSinkConfigTests.java
 
b/pulsar-io/hdfs/src/test/java/org/apache/pulsar/io/hdfs/sink/HdfsSinkConfigTests.java
new file mode 100644
index 0000000000..2f0b3f3e9a
--- /dev/null
+++ 
b/pulsar-io/hdfs/src/test/java/org/apache/pulsar/io/hdfs/sink/HdfsSinkConfigTests.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.hdfs.sink;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.pulsar.io.hdfs.Compression;
+import org.testng.annotations.Test;
+
+import com.fasterxml.jackson.databind.exc.InvalidFormatException;
+
+
+public class HdfsSinkConfigTests {
+       
+       @Test
+       public final void loadFromYamlFileTest() throws IOException {
+               File yamlFile = getFile("sinkConfig.yaml");
+               HdfsSinkConfig config = 
HdfsSinkConfig.load(yamlFile.getAbsolutePath());
+               assertNotNull(config);
+               assertEquals("core-site.xml", config.getHdfsConfigResources());
+               assertEquals("/foo/bar", config.getDirectory());
+               assertEquals("prefix", config.getFilenamePrefix());
+               assertEquals(Compression.SNAPPY, config.getCompression());
+       }
+       
+       @Test
+       public final void loadFromMapTest() throws IOException {
+               Map<String, Object> map = new HashMap<String, Object> ();
+               map.put("hdfsConfigResources", "core-site.xml");
+               map.put("directory", "/foo/bar");
+               map.put("filenamePrefix", "prefix");
+               map.put("compression", "SNAPPY");
+               
+               HdfsSinkConfig config = HdfsSinkConfig.load(map);
+               assertNotNull(config);
+               assertEquals("core-site.xml", config.getHdfsConfigResources());
+               assertEquals("/foo/bar", config.getDirectory());
+               assertEquals("prefix", config.getFilenamePrefix());
+               assertEquals(Compression.SNAPPY, config.getCompression());
+       }
+       
+       @Test
+       public final void validValidateTest() throws IOException {
+               Map<String, Object> map = new HashMap<String, Object> ();
+               map.put("hdfsConfigResources", "core-site.xml");
+               map.put("directory", "/foo/bar");
+               map.put("filenamePrefix", "prefix");
+               map.put("fileExtension", ".txt");
+               
+               HdfsSinkConfig config = HdfsSinkConfig.load(map);
+               config.validate();
+       }
+       
+       @Test(expectedExceptions = IllegalArgumentException.class, 
+                       expectedExceptionsMessageRegExp = "Required property 
not set.")
+       public final void missingDirectoryValidateTest() throws IOException {
+               Map<String, Object> map = new HashMap<String, Object> ();
+               map.put("hdfsConfigResources", "core-site.xml");
+               
+               HdfsSinkConfig config = HdfsSinkConfig.load(map);
+               config.validate();
+       }
+       
+       @Test(expectedExceptions = IllegalArgumentException.class, 
+                 expectedExceptionsMessageRegExp = "Required property not 
set.")
+       public final void missingHdfsConfigsValidateTest() throws IOException {
+               Map<String, Object> map = new HashMap<String, Object> ();
+               map.put("directory", "/foo/bar");
+               
+               HdfsSinkConfig config = HdfsSinkConfig.load(map);
+               config.validate();
+       }
+       
+       @Test(expectedExceptions = InvalidFormatException.class)
+       public final void invalidCodecValidateTest() throws IOException {
+               Map<String, Object> map = new HashMap<String, Object> ();
+               map.put("hdfsConfigResources", "core-site.xml");
+               map.put("directory", "/foo/bar");
+               map.put("filenamePrefix", "prefix");
+               map.put("fileExtension", ".txt");
+               map.put("compression", "bad value");
+               
+               HdfsSinkConfig config = HdfsSinkConfig.load(map);
+               config.validate();
+       }
+       
+       @Test(expectedExceptions = IllegalArgumentException.class, 
+                 expectedExceptionsMessageRegExp = "Sync Interval cannot be 
negative")
+       public final void invalidSyncIntervalTest() throws IOException {
+               Map<String, Object> map = new HashMap<String, Object> ();
+               map.put("hdfsConfigResources", "core-site.xml");
+               map.put("directory", "/foo/bar");
+               map.put("filenamePrefix", "prefix");
+               map.put("fileExtension", ".txt");
+               map.put("syncInterval", -1);
+               
+               HdfsSinkConfig config = HdfsSinkConfig.load(map);
+               config.validate();
+       }
+       
+       @Test(expectedExceptions = IllegalArgumentException.class, 
+                 expectedExceptionsMessageRegExp = "Max Pending Records must 
be a positive integer")
+       public final void invalidMaxPendingRecordsTest() throws IOException {
+               Map<String, Object> map = new HashMap<String, Object> ();
+               map.put("hdfsConfigResources", "core-site.xml");
+               map.put("directory", "/foo/bar");
+               map.put("filenamePrefix", "prefix");
+               map.put("fileExtension", ".txt");
+               map.put("maxPendingRecords", 0);
+               
+               HdfsSinkConfig config = HdfsSinkConfig.load(map);
+               config.validate();
+       }
+       
+       @Test(expectedExceptions = IllegalArgumentException.class, 
+                 expectedExceptionsMessageRegExp = "Values for both 
kerberosUserPrincipal & keytab are required.")
+       public final void kerberosValidateTest() throws IOException {
+               Map<String, Object> map = new HashMap<String, Object> ();
+               map.put("hdfsConfigResources", "core-site.xml");
+               map.put("directory", "/foo/bar");
+               map.put("filenamePrefix", "prefix");
+               map.put("keytab", "/etc/keytab/hdfs.client.ktab");
+               
+               HdfsSinkConfig config = HdfsSinkConfig.load(map);
+               config.validate();
+       }
+       
+       private File getFile(String name) {
+               ClassLoader classLoader = getClass().getClassLoader();
+               return new File(classLoader.getResource(name).getFile());
+       }
+}
diff --git 
a/pulsar-io/hdfs/src/test/java/org/apache/pulsar/io/hdfs/sink/seq/HdfsSequentialSinkTests.java
 
b/pulsar-io/hdfs/src/test/java/org/apache/pulsar/io/hdfs/sink/seq/HdfsSequentialSinkTests.java
new file mode 100644
index 0000000000..d54e5ec0ef
--- /dev/null
+++ 
b/pulsar-io/hdfs/src/test/java/org/apache/pulsar/io/hdfs/sink/seq/HdfsSequentialSinkTests.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.hdfs.sink.seq;
+
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.testng.Assert.assertNotNull;
+
+import org.apache.pulsar.io.hdfs.sink.AbstractHdfsSinkTest;
+import org.testng.annotations.Test;
+
+public class HdfsSequentialSinkTests extends AbstractHdfsSinkTest<Long, 
String> {
+       
+    @Override
+    protected void createSink() {
+        sink = new HdfsSequentialTextSink();
+    }
+    
+    @Test(enabled = false)
+       public final void write100Test() throws Exception {
+               map.put("filenamePrefix", "write100Test-seq");
+               map.put("fileExtension", ".seq");
+               map.put("syncInterval", 1000);
+               sink.open(map, mockSinkContext);
+               
+               assertNotNull(sink);
+               send(100);
+               
+               Thread.sleep(2000);
+               verify(mockRecord, times(100)).ack();
+               sink.close();
+       }
+       
+       @Test(enabled = false)
+       public final void write5000Test() throws Exception {
+               map.put("filenamePrefix", "write5000Test-seq");
+               map.put("fileExtension", ".seq");
+               map.put("syncInterval", 1000);
+               sink.open(map, mockSinkContext);
+               
+               assertNotNull(sink);
+               send(5000);
+               
+               Thread.sleep(2000);
+               verify(mockRecord, times(5000)).ack();
+               sink.close();
+       }
+       
+       @Test(enabled = false)
+       public final void tenSecondTest() throws Exception {
+               map.put("filenamePrefix", "tenSecondTest-seq");
+               map.put("fileExtension", ".seq");
+               map.put("syncInterval", 1000);
+               sink.open(map, mockSinkContext);
+               runFor(10);     
+               sink.close();
+       }
+       
+       @Test(enabled = false)
+       public final void bzip2CompressionTest() throws Exception {
+               map.put("filenamePrefix", "bzip2CompressionTest-seq");
+               map.put("compression", "BZIP2");
+               map.remove("fileExtension");
+               sink.open(map, mockSinkContext);
+               send(5000);
+               verify(mockRecord, times(5000)).ack();
+       }
+       
+       @Test(enabled = false)
+       public final void deflateCompressionTest() throws Exception {
+               map.put("filenamePrefix", "deflateCompressionTest-seq");
+               map.put("compression", "DEFLATE");
+               map.remove("fileExtension");
+               sink.open(map, mockSinkContext);
+               send(5000);
+               verify(mockRecord, times(5000)).ack();
+       }
+}
diff --git 
a/pulsar-io/hdfs/src/test/java/org/apache/pulsar/io/hdfs/sink/seq/HdfsTextSinkTests.java
 
b/pulsar-io/hdfs/src/test/java/org/apache/pulsar/io/hdfs/sink/seq/HdfsTextSinkTests.java
new file mode 100644
index 0000000000..bb720fa828
--- /dev/null
+++ 
b/pulsar-io/hdfs/src/test/java/org/apache/pulsar/io/hdfs/sink/seq/HdfsTextSinkTests.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.hdfs.sink.seq;
+
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.testng.Assert.assertNotNull;
+
+import org.apache.pulsar.io.hdfs.sink.AbstractHdfsSinkTest;
+import org.testng.annotations.Test;
+
+public class HdfsTextSinkTests extends AbstractHdfsSinkTest<String, String> {
+    
+    @Override
+    protected void createSink() {
+        sink = new HdfsTextSink();
+    }
+
+    @Test(enabled = false)
+    public final void write100Test() throws Exception {
+        map.put("filenamePrefix", "write100TestText-seq");
+        map.put("fileExtension", ".seq");
+        map.put("syncInterval", 1000);
+        sink.open(map, mockSinkContext);
+        
+        assertNotNull(sink);
+        assertNotNull(mockRecord);
+        send(100);
+        
+        Thread.sleep(2000);
+        verify(mockRecord, times(100)).ack();
+        sink.close();
+    }
+    
+    @Test(enabled = false)
+    public final void write5000Test() throws Exception {
+        map.put("filenamePrefix", "write5000TestText-seq");
+        map.put("fileExtension", ".seq");
+        map.put("syncInterval", 1000);
+        sink.open(map, mockSinkContext);
+        
+        assertNotNull(sink);
+        assertNotNull(mockRecord);
+        send(5000);
+        
+        Thread.sleep(2000);
+        verify(mockRecord, times(5000)).ack();
+        sink.close();
+    }
+    
+    @Test(enabled = false)
+    public final void tenSecondTest() throws Exception {
+        map.put("filenamePrefix", "tenSecondTestText-seq");
+        map.put("fileExtension", ".seq");
+        map.put("syncInterval", 1000);
+        sink.open(map, mockSinkContext);
+        
+        assertNotNull(mockRecord);
+        
+        runFor(10); 
+        sink.close();
+    }
+    
+    @Test(enabled = false)
+    public final void bzip2CompressionTest() throws Exception {
+        map.put("filenamePrefix", "bzip2CompressionTestText-seq");
+        map.put("compression", "BZIP2");
+        map.remove("fileExtension");
+        sink.open(map, mockSinkContext);
+        
+        assertNotNull(mockRecord);
+        
+        send(5000);
+        verify(mockRecord, times(5000)).ack();
+    }
+    
+    @Test(enabled = false)
+    public final void deflateCompressionTest() throws Exception {
+        map.put("filenamePrefix", "deflateCompressionTestText-seq");
+        map.put("compression", "DEFLATE");
+        map.remove("fileExtension");
+        sink.open(map, mockSinkContext);
+        
+        assertNotNull(mockRecord);
+        send(5000);
+        verify(mockRecord, times(5000)).ack();
+    }
+}
diff --git 
a/pulsar-io/hdfs/src/test/java/org/apache/pulsar/io/hdfs/sink/text/HdfsStringSinkTests.java
 
b/pulsar-io/hdfs/src/test/java/org/apache/pulsar/io/hdfs/sink/text/HdfsStringSinkTests.java
new file mode 100644
index 0000000000..98b8a61e6b
--- /dev/null
+++ 
b/pulsar-io/hdfs/src/test/java/org/apache/pulsar/io/hdfs/sink/text/HdfsStringSinkTests.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.hdfs.sink.text;
+
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import org.apache.pulsar.io.hdfs.sink.AbstractHdfsSinkTest;
+import org.testng.annotations.Test;
+
+public class HdfsStringSinkTests extends AbstractHdfsSinkTest<String, String> {
+       
+    @Override
+    protected void createSink() {
+        sink = new HdfsStringSink();
+    }
+    
+    @Test(enabled = false)
+       public final void write5000Test() throws Exception {
+               map.put("filenamePrefix", "write5000Test");
+               map.put("fileExtension", ".txt");
+               map.put("separator", '\n');
+               sink.open(map, mockSinkContext);
+               send(5000);
+               sink.close();
+               verify(mockRecord, times(5000)).ack();
+       }
+       
+    @Test(enabled = false)
+       public final void fiveByTwoThousandTest() throws Exception {
+               map.put("filenamePrefix", "fiveByTwoThousandTest");
+               map.put("fileExtension", ".txt");
+               map.put("separator", '\n');
+               sink.open(map, mockSinkContext);
+               
+               for (int idx = 1; idx < 6; idx++) {
+                       send(2000);
+               }
+               sink.close();
+               verify(mockRecord, times(2000 * 5)).ack();
+       }
+       
+    @Test(enabled = false)
+       public final void tenSecondTest() throws Exception {
+               map.put("filenamePrefix", "tenSecondTest");
+               map.put("fileExtension", ".txt");
+               map.put("separator", '\n');
+               sink.open(map, mockSinkContext);
+               runFor(10);     
+               sink.close();
+       }
+       
+    @Test(enabled = false)
+       public final void maxPendingRecordsTest() throws Exception {
+               map.put("filenamePrefix", "maxPendingRecordsTest");
+               map.put("fileExtension", ".txt");
+               map.put("separator", '\n');
+               map.put("maxPendingRecords", 500);
+               sink.open(map, mockSinkContext);
+               runFor(10);     
+               sink.close();
+       }
+       
+    @Test(enabled = false)
+       public final void bzip2CompressionTest() throws Exception {
+               map.put("filenamePrefix", "bzip2CompressionTest");
+               map.put("compression", "BZIP2");
+               map.remove("fileExtension");
+               map.put("separator", '\n');
+               sink.open(map, mockSinkContext);
+               send(5000);
+               sink.close();
+               verify(mockRecord, times(5000)).ack();
+       }
+       
+    @Test(enabled = false)
+       public final void deflateCompressionTest() throws Exception {
+               map.put("filenamePrefix", "deflateCompressionTest");
+               map.put("compression", "DEFLATE");
+               map.put("fileExtension", ".deflate");
+               map.put("separator", '\n');
+               sink.open(map, mockSinkContext);
+               send(50000);
+               sink.close();
+               verify(mockRecord, times(50000)).ack();
+       }
+}
diff --git a/pulsar-io/hdfs/src/test/resources/hadoop/core-site.xml 
b/pulsar-io/hdfs/src/test/resources/hadoop/core-site.xml
new file mode 100644
index 0000000000..31d1e98c47
--- /dev/null
+++ b/pulsar-io/hdfs/src/test/resources/hadoop/core-site.xml
@@ -0,0 +1,32 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<configuration>
+    <property>
+        <name>fs.defaultFS</name>
+        <value>hdfs://0.0.0.0:8020</value>
+    </property>
+    <property>
+        <name>io.compression.codecs</name>
+        <value>org.apache.hadoop.io.compress.GzipCodec,
+               org.apache.hadoop.io.compress.DefaultCodec,
+               org.apache.hadoop.io.compress.SnappyCodec</value>
+   </property>
+</configuration>
diff --git a/pulsar-io/hdfs/src/test/resources/hadoop/hdfs-site.xml 
b/pulsar-io/hdfs/src/test/resources/hadoop/hdfs-site.xml
new file mode 100644
index 0000000000..bb722f1f63
--- /dev/null
+++ b/pulsar-io/hdfs/src/test/resources/hadoop/hdfs-site.xml
@@ -0,0 +1,34 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<configuration>
+    <property>
+        <name>dfs.replication</name>
+        <value>1</value>
+    </property>
+    <property>
+       <name>dfs.client.use.datanode.hostname</name>
+       <value>true</value>
+    </property>
+    <property>
+       <name>dfs.support.append</name>
+       <value>true</value>
+   </property>
+</configuration>
diff --git a/pulsar-io/hdfs/src/test/resources/sinkConfig.yaml 
b/pulsar-io/hdfs/src/test/resources/sinkConfig.yaml
new file mode 100644
index 0000000000..5a19ee0c5f
--- /dev/null
+++ b/pulsar-io/hdfs/src/test/resources/sinkConfig.yaml
@@ -0,0 +1,25 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+{
+"hdfsConfigResources": "core-site.xml",
+"directory": "/foo/bar",
+"filenamePrefix": "prefix",
+"compression": "SNAPPY"
+}
\ No newline at end of file
diff --git a/pulsar-io/pom.xml b/pulsar-io/pom.xml
index e89cc0271a..ae56f92562 100644
--- a/pulsar-io/pom.xml
+++ b/pulsar-io/pom.xml
@@ -38,6 +38,7 @@
     <module>kafka</module>
     <module>rabbitmq</module>
     <module>kinesis</module>
+    <module>hdfs</module>
     <module>jdbc</module>
     <module>data-genenator</module>
   </modules>
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/HdfsContainer.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/HdfsContainer.java
new file mode 100644
index 0000000000..58f781f3ba
--- /dev/null
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/HdfsContainer.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.containers;
+
+import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy;
+
+public class HdfsContainer extends ChaosContainer<HdfsContainer> {
+
+       public static final String NAME = "HDFS";
+       static final Integer[] PORTS = { 8020, 8032, 8088, 9000, 10020, 19888, 
50010, 50020, 50070, 50070, 50090 };
+       
+       private static final String IMAGE_NAME = "harisekhon/hadoop:latest";
+    
+       public HdfsContainer(String clusterName) {
+               super(clusterName, IMAGE_NAME);
+       }
+       
+       @Override
+    public String getContainerName() {
+        return clusterName;
+    }
+       
+       @Override
+    protected void configure() {
+               super.configure();
+               this.withNetworkAliases(NAME)
+        .withExposedPorts(PORTS)
+        .withCreateContainerCmdModifier(createContainerCmd -> {
+            createContainerCmd.withHostName(NAME);
+            createContainerCmd.withName(clusterName + "-" + NAME);
+        })
+        .waitingFor(new HostPortWaitStrategy());
+       }
+
+}
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index 7b3cd4f1b1..17634a1ab8 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -72,7 +72,12 @@ public void testCassandraSink() throws Exception {
     public void testCassandraArchiveSink() throws Exception {
         testSink(new CassandraSinkArchiveTester(), false);
     }
-
+    
+    @Test(enabled = false)
+    public void testHdfsSink() throws Exception {
+        testSink(new HdfsSinkTester(), false);
+    }
+    
     @Test
     public void testJdbcSink() throws Exception {
         testSink(new JdbcSinkTester(), true);
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/HdfsSinkTester.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/HdfsSinkTester.java
new file mode 100644
index 0000000000..46c5f242df
--- /dev/null
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/HdfsSinkTester.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.io;
+
+import java.util.Map;
+
+import org.apache.pulsar.tests.integration.containers.HdfsContainer;
+import org.testcontainers.containers.GenericContainer;
+
+import static com.google.common.base.Preconditions.checkState;
+
+public class HdfsSinkTester extends SinkTester {
+       
+       private static final String NAME = "HDFS";
+       
+       private HdfsContainer hdfsCluster;
+       
+       public HdfsSinkTester() {
+               super(SinkType.HDFS);
+               
+               // TODO How do I get the core-site.xml, and hdfs-site.xml files 
from the container?
+               sinkConfig.put("hdfsConfigResources", "");
+               sinkConfig.put("directory", "/testing/test");
+       }
+
+       @Override
+       public void findSinkServiceContainer(Map<String, GenericContainer<?>> 
containers) {
+               GenericContainer<?> container = containers.get(NAME);   
+               checkState(container instanceof HdfsContainer, "No HDFS service 
found in the cluster");
+           this.hdfsCluster = (HdfsContainer) container;
+       }
+
+       @Override
+       public void prepareSink() throws Exception {
+               // Create the test directory
+               hdfsCluster.execInContainer("/hadoop/bin/hdfs","dfs", "-mkdir", 
"/tmp/testing");
+               hdfsCluster.execInContainer("/hadoop/bin/hdfs", "-chown", 
"tester:testing", "/tmp/testing");
+               
+               // Execute all future commands as the "tester" user
+               hdfsCluster.execInContainer("export HADOOP_USER_NAME=tester");
+       }
+
+       @Override
+       public void validateSinkResult(Map<String, String> kvs) {
+               // TODO Auto-generated method stub
+
+       }
+
+}
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SinkTester.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SinkTester.java
index 098b8bf7c5..7f4b2d9a1b 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SinkTester.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SinkTester.java
@@ -33,7 +33,8 @@
         UNDEFINED,
         CASSANDRA,
         KAFKA,
-        JDBC
+        JDBC,
+        HDFS
     }
 
     protected final SinkType sinkType;
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTestSuite.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTestSuite.java
index 147f273abb..438c96e2a1 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTestSuite.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTestSuite.java
@@ -20,6 +20,7 @@
 
 import java.util.Map;
 import org.apache.pulsar.tests.integration.containers.CassandraContainer;
+import org.apache.pulsar.tests.integration.containers.HdfsContainer;
 import 
org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec.PulsarClusterSpecBuilder;
 import org.apache.pulsar.tests.integration.topologies.PulsarClusterTestBase;
 import org.testcontainers.containers.GenericContainer;


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to