[GitHub] david-streamlio commented on a change in pull request #2409: Added HDFS Sink

2018-08-27 Thread GitBox
david-streamlio commented on a change in pull request #2409: Added HDFS Sink
URL: https://github.com/apache/incubator-pulsar/pull/2409#discussion_r213064055
 
 

 ##
 File path: 
pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/AbstractHdfsConfig.java
 ##
 @@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.hdfs;
+
+import java.io.Serializable;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import lombok.experimental.Accessors;
+
+import org.apache.commons.lang.StringUtils;
+
+/**
+ * Configuration object for all HDFS components.
+ */
+@Data
+@Setter
+@Getter
+@EqualsAndHashCode
+@ToString
+@Accessors(chain = true)
+public abstract class AbstractHdfsConfig implements Serializable {
+
+private static final long serialVersionUID = 1L;
+
+/**
+ * A file or comma separated list of files which contains the Hadoop file 
system configuration.
+ * Without this, Hadoop will search the classpath for a 'core-site.xml' 
and 'hdfs-site.xml' file
+ * or will revert to a default configuration.
+ */
+private String hdfsConfigResources;
+
+/**
+ * The HDFS directory from which files should be read from or written to.
+ */
+private String directory;
+
+/**
+ * The character encoding for the files, e.g. UTF-8, ASCII, etc.
+ */
+private String encoding;
+
+/**
+ * The compression codec used to compress/de-compress the files on HDFS.
+ */
+private Compression compression;
+
+/**
+ * The Kerberos user principal account to use for authentication.
+ */
+private String kerberosUserPrincipal;
+
+/**
+ * The full pathname to the Kerberos keytab file to use for authentication.
+ */
+private String keytab;
+
+public void validate() {
+if (StringUtils.isEmpty(hdfsConfigResources) || 
StringUtils.isEmpty(directory)) {
 
 Review comment:
   I will change the comment, since we cannot assume that this Sink will be 
executing on a node that has the Hadoop client configuration files available.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] david-streamlio commented on a change in pull request #2409: Added HDFS Sink

2018-08-23 Thread GitBox
david-streamlio commented on a change in pull request #2409: Added HDFS Sink
URL: https://github.com/apache/incubator-pulsar/pull/2409#discussion_r212455850
 
 

 ##
 File path: 
tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/HdfsContainer.java
 ##
 @@ -0,0 +1,35 @@
+package org.apache.pulsar.tests.integration.containers;
 
 Review comment:
   Fixed


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] david-streamlio commented on a change in pull request #2409: Added HDFS Sink

2018-08-23 Thread GitBox
david-streamlio commented on a change in pull request #2409: Added HDFS Sink
URL: https://github.com/apache/incubator-pulsar/pull/2409#discussion_r212455767
 
 

 ##
 File path: 
pulsar-io/hdfs/src/test/java/org/apache/pulsar/io/hdfs/sink/HdfsSinkConfigTests.java
 ##
 @@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.hdfs.sink;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+public class HdfsSinkConfigTests {
+   
+   @Rule
+public ExpectedException thrown = ExpectedException.none();
+
+   @Test
+   public final void loadFromYamlFileTest() throws IOException {
+   File yamlFile = getFile("sinkConfig.yaml");
 
 Review comment:
   Fixed and added "contrib-check" profile to main pom.xml file to validate 
check style compliance


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] david-streamlio commented on a change in pull request #2409: Added HDFS Sink

2018-08-23 Thread GitBox
david-streamlio commented on a change in pull request #2409: Added HDFS Sink
URL: https://github.com/apache/incubator-pulsar/pull/2409#discussion_r212455521
 
 

 ##
 File path: 
pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/AbstractHdfsConfig.java
 ##
 @@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.hdfs;
+
+import java.io.Serializable;
+import java.util.stream.Stream;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.io.compress.BZip2Codec;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.DeflateCodec;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.io.compress.Lz4Codec;
+import org.apache.hadoop.io.compress.SnappyCodec;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import lombok.experimental.Accessors;
+
+@Data
+@Setter
+@Getter
+@EqualsAndHashCode
+@ToString
+@Accessors(chain = true)
+public abstract class AbstractHdfsConfig implements Serializable {
+
+   public static final String BZIP2 = "BZip2";
+   public static final String DEFLATE = "Deflate";
+   public static final String GZIP = "Gzip";
+   public static final String LZ4 = "Lz4";
+   public static final String SNAPPY = "Snappy";
+   
+   private static final long serialVersionUID = 1L;
+   
+   /**
+* A file or comma separated list of files which contains the Hadoop 
file system configuration. Without this, Hadoop
+ * will search the classpath for a 'core-site.xml' and 'hdfs-site.xml' 
file or will revert to a default configuration.
+*/
+   protected String hdfsConfigResources;
+   
+   /**
+* The HDFS directory from which files should be read from or written to
+*/
+   protected String directory;
+   
+   /**
+* The character encoding for the files, e.g. UTF-8, ASCII, etc.
+*/
+   protected String encoding;
+   
+   /**
+* The compression codec used to compress/de-compress the files on 
HDFS. 
+*/
+   protected String compression;
+   
+   /**
+* The Kerberos user principal account to use for authentication
+*/
+   protected String kerberosUserPrincipal;
+   
+   /**
+* The full pathname to the Kerberos keytab file to use for 
authentication.
+*/
+   protected String keytab;
+   
+   public void validate() {
+   if (StringUtils.isEmpty(hdfsConfigResources) || 
StringUtils.isEmpty(directory) )
+   throw new IllegalArgumentException("Required property 
not set.");
+   
+   if (StringUtils.isNotEmpty(compression)) {
+   if (!Stream.of(BZIP2, DEFLATE, GZIP, LZ4, 
SNAPPY).anyMatch(compression::equalsIgnoreCase)) {
+   throw new IllegalArgumentException("Invalid 
Compression code specified. Valid values are 'BZip2', 'Deflate', 'Gzip', 'Lz4', 
or 'Snappy'");
+   }
+   }
+   
+   if ( (StringUtils.isNotEmpty(kerberosUserPrincipal) && 
StringUtils.isEmpty(keytab)) ||
+(StringUtils.isEmpty(kerberosUserPrincipal) && 
StringUtils.isNotEmpty(keytab)) ) {
+   throw new IllegalArgumentException("Values for both 
kerberosUserPrincipal & keytab are required.");
+   }
+   }
+
+public CompressionCodec getCompressionCodec() {
+   if (StringUtils.isBlank(compression)) 
+   return null;
+   
+   if (compression.equalsIgnoreCase(BZIP2))
+   return new BZip2Codec();
+   
+   if (compression.equalsIgnoreCase(DEFLATE))
+   return new DeflateCodec();
+   
+   if (compression.equalsIgnoreCase(GZIP))
+   return new GzipCodec();
+   
+   if (compression.equalsIgnoreCase(LZ4))
+   return new Lz4Codec();
+   
+   if (compression.equalsIgnoreCase(SNAPPY))
+   return new SnappyCodec();
+   
+   return null;
+}
+
+   

[GitHub] david-streamlio commented on a change in pull request #2409: Added HDFS Sink

2018-08-23 Thread GitBox
david-streamlio commented on a change in pull request #2409: Added HDFS Sink
URL: https://github.com/apache/incubator-pulsar/pull/2409#discussion_r212455382
 
 

 ##
 File path: 
pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/AbstractHdfsConfig.java
 ##
 @@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.hdfs;
+
+import java.io.Serializable;
+import java.util.stream.Stream;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.io.compress.BZip2Codec;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.DeflateCodec;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.io.compress.Lz4Codec;
+import org.apache.hadoop.io.compress.SnappyCodec;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import lombok.experimental.Accessors;
+
+@Data
+@Setter
+@Getter
+@EqualsAndHashCode
+@ToString
+@Accessors(chain = true)
+public abstract class AbstractHdfsConfig implements Serializable {
+
+   public static final String BZIP2 = "BZip2";
+   public static final String DEFLATE = "Deflate";
+   public static final String GZIP = "Gzip";
+   public static final String LZ4 = "Lz4";
+   public static final String SNAPPY = "Snappy";
 
 Review comment:
   Fixed


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] david-streamlio commented on a change in pull request #2409: Added HDFS Sink

2018-08-23 Thread GitBox
david-streamlio commented on a change in pull request #2409: Added HDFS Sink
URL: https://github.com/apache/incubator-pulsar/pull/2409#discussion_r212455416
 
 

 ##
 File path: 
pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/AbstractHdfsConfig.java
 ##
 @@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.hdfs;
+
+import java.io.Serializable;
+import java.util.stream.Stream;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.io.compress.BZip2Codec;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.DeflateCodec;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.io.compress.Lz4Codec;
+import org.apache.hadoop.io.compress.SnappyCodec;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import lombok.experimental.Accessors;
+
+@Data
+@Setter
+@Getter
+@EqualsAndHashCode
+@ToString
+@Accessors(chain = true)
+public abstract class AbstractHdfsConfig implements Serializable {
+
+   public static final String BZIP2 = "BZip2";
+   public static final String DEFLATE = "Deflate";
+   public static final String GZIP = "Gzip";
+   public static final String LZ4 = "Lz4";
+   public static final String SNAPPY = "Snappy";
+   
+   private static final long serialVersionUID = 1L;
 
 Review comment:
   Fixed


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] david-streamlio commented on a change in pull request #2409: Added HDFS Sink

2018-08-23 Thread GitBox
david-streamlio commented on a change in pull request #2409: Added HDFS Sink
URL: https://github.com/apache/incubator-pulsar/pull/2409#discussion_r212455487
 
 

 ##
 File path: 
pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/AbstractHdfsConfig.java
 ##
 @@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.hdfs;
+
+import java.io.Serializable;
+import java.util.stream.Stream;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.io.compress.BZip2Codec;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.DeflateCodec;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.io.compress.Lz4Codec;
+import org.apache.hadoop.io.compress.SnappyCodec;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import lombok.experimental.Accessors;
+
+@Data
+@Setter
+@Getter
+@EqualsAndHashCode
+@ToString
+@Accessors(chain = true)
+public abstract class AbstractHdfsConfig implements Serializable {
+
+   public static final String BZIP2 = "BZip2";
+   public static final String DEFLATE = "Deflate";
+   public static final String GZIP = "Gzip";
+   public static final String LZ4 = "Lz4";
+   public static final String SNAPPY = "Snappy";
+   
+   private static final long serialVersionUID = 1L;
+   
+   /**
+* A file or comma separated list of files which contains the Hadoop 
file system configuration. Without this, Hadoop
+ * will search the classpath for a 'core-site.xml' and 'hdfs-site.xml' 
file or will revert to a default configuration.
+*/
+   protected String hdfsConfigResources;
 
 Review comment:
   Fixed


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services