[GitHub] nifi pull request #1712: NIFI-3724 - Add Put/Fetch Parquet Processors

2017-05-01 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/nifi/pull/1712


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request #1712: NIFI-3724 - Add Put/Fetch Parquet Processors

2017-05-01 Thread alopresto
Github user alopresto commented on a diff in the pull request:

https://github.com/apache/nifi/pull/1712#discussion_r114179324
  
--- Diff: 
nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFSRecord.java
 ---
@@ -0,0 +1,505 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hadoop;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
+import org.apache.nifi.annotation.configuration.DefaultSettings;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.FlowFileAccessException;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.hadoop.exception.FailureException;
+import 
org.apache.nifi.processors.hadoop.exception.RecordReaderFactoryException;
+import org.apache.nifi.processors.hadoop.record.HDFSRecordWriter;
+import org.apache.nifi.schema.access.SchemaAccessStrategy;
+import org.apache.nifi.schema.access.SchemaAccessUtils;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.schemaregistry.services.SchemaRegistry;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.nifi.util.StopWatch;
+
+import java.io.BufferedInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.security.PrivilegedAction;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static 
org.apache.nifi.schema.access.SchemaAccessUtils.HWX_CONTENT_ENCODED_SCHEMA;
+import static 
org.apache.nifi.schema.access.SchemaAccessUtils.HWX_SCHEMA_REF_ATTRIBUTES;
+import static 
org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY;
+import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_NAME;
+import static 
org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_NAME_PROPERTY;
+import static 
org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_REGISTRY;
+import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_TEXT;
+import static 
org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_TEXT_PROPERTY;
+
+/**
+ * Base class for processors that write Records to HDFS.
+ */
+@TriggerWhenEmpty // trigger when empty so we have a chance to perform a 
Kerberos re-login
+@DefaultSettings(yieldDuration = "100 ms") // decrease the default yield 
since we are triggering wh

[GitHub] nifi pull request #1712: NIFI-3724 - Add Put/Fetch Parquet Processors

2017-05-01 Thread bbende
Github user bbende commented on a diff in the pull request:

https://github.com/apache/nifi/pull/1712#discussion_r114170838
  
--- Diff: 
nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFSRecord.java
 ---
@@ -0,0 +1,505 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hadoop;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
+import org.apache.nifi.annotation.configuration.DefaultSettings;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.FlowFileAccessException;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.hadoop.exception.FailureException;
+import 
org.apache.nifi.processors.hadoop.exception.RecordReaderFactoryException;
+import org.apache.nifi.processors.hadoop.record.HDFSRecordWriter;
+import org.apache.nifi.schema.access.SchemaAccessStrategy;
+import org.apache.nifi.schema.access.SchemaAccessUtils;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.schemaregistry.services.SchemaRegistry;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.nifi.util.StopWatch;
+
+import java.io.BufferedInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.security.PrivilegedAction;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static 
org.apache.nifi.schema.access.SchemaAccessUtils.HWX_CONTENT_ENCODED_SCHEMA;
+import static 
org.apache.nifi.schema.access.SchemaAccessUtils.HWX_SCHEMA_REF_ATTRIBUTES;
+import static 
org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY;
+import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_NAME;
+import static 
org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_NAME_PROPERTY;
+import static 
org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_REGISTRY;
+import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_TEXT;
+import static 
org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_TEXT_PROPERTY;
+
+/**
+ * Base class for processors that write Records to HDFS.
+ */
+@TriggerWhenEmpty // trigger when empty so we have a chance to perform a 
Kerberos re-login
+@DefaultSettings(yieldDuration = "100 ms") // decrease the default yield 
since we are triggering when 

[GitHub] nifi pull request #1712: NIFI-3724 - Add Put/Fetch Parquet Processors

2017-05-01 Thread alopresto
Github user alopresto commented on a diff in the pull request:

https://github.com/apache/nifi/pull/1712#discussion_r114159819
  
--- Diff: 
nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaAccessUtils.java
 ---
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.schema.access;
+
+import org.apache.nifi.avro.AvroSchemaValidator;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.schemaregistry.services.SchemaRegistry;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+public class SchemaAccessUtils {
+
+public static final AllowableValue SCHEMA_NAME_PROPERTY = new 
AllowableValue("schema-name", "Use 'Schema Name' Property",
+"The name of the Schema to use is specified by the 'Schema 
Name' Property. The value of this property is used to lookup the Schema in the 
configured Schema Registry service.");
+public static final AllowableValue SCHEMA_TEXT_PROPERTY = new 
AllowableValue("schema-text-property", "Use 'Schema Text' Property",
+"The text of the Schema itself is specified by the 'Schema 
Text' Property. The value of this property must be a valid Avro Schema. "
++ "If Expression Language is used, the value of the 
'Schema Text' property must be valid after substituting the expressions.");
+public static final AllowableValue HWX_CONTENT_ENCODED_SCHEMA = new 
AllowableValue("hwx-content-encoded-schema", "HWX Content-Encoded Schema 
Reference",
+"The content of the FlowFile contains a reference to a schema 
in the Schema Registry service. The reference is encoded as a single byte 
indicating the 'protocol version', "
++ "followed by 8 bytes indicating the schema 
identifier, and finally 4 bytes indicating the schema version, as per the 
Hortonworks Schema Registry serializers and deserializers, "
++ "found at https://github.com/hortonworks/registry";);
+public static final AllowableValue HWX_SCHEMA_REF_ATTRIBUTES = new 
AllowableValue("hwx-schema-ref-attributes", "HWX Schema Reference Attributes",
+"The FlowFile contains 3 Attributes that will be used to 
lookup a Schema from the configured Schema Registry: 'schema.identifier', 
'schema.version', and 'schema.protocol.version'");
+
+public  static final PropertyDescriptor SCHEMA_REGISTRY = new 
PropertyDescriptor.Builder()
+.name("Schema Registry")
+.description("Specifies the Controller Service to use for the 
Schema Registry")
+.identifiesControllerService(SchemaRegistry.class)
+.required(false)
+.build();
+
+public  static final PropertyDescriptor SCHEMA_ACCESS_STRATEGY = new 
PropertyDescriptor.Builder()
+.name("Schema Access Strategy")
+.description("Specifies how to obtain the schema that is to be 
used for interpreting the data.")
+.allowableValues(SCHEMA_NAME_PROPERTY, SCHEMA_TEXT_PROPERTY, 
HWX_SCHEMA_REF_ATTRIBUTES, HWX_CONTENT_ENCODED_SCHEMA)
+.defaultValue(SCHEMA_NAME_PROPERTY.getValue())
+.required(true)
+.build();
+
+public static final PropertyDescriptor SCHEMA_NAME = new 
PropertyDescriptor.Builder()
+.name("Schema Name")
+.description("Specifies the name of the schema to lookup in 
the Schema Registry property")
--- End diff --

I don't mean it has to be difficult to read, but as this is used as the 
value identifier when the

[GitHub] nifi pull request #1712: NIFI-3724 - Add Put/Fetch Parquet Processors

2017-05-01 Thread alopresto
Github user alopresto commented on a diff in the pull request:

https://github.com/apache/nifi/pull/1712#discussion_r114157766
  
--- Diff: 
nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFSRecord.java
 ---
@@ -0,0 +1,505 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hadoop;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
+import org.apache.nifi.annotation.configuration.DefaultSettings;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.FlowFileAccessException;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.hadoop.exception.FailureException;
+import 
org.apache.nifi.processors.hadoop.exception.RecordReaderFactoryException;
+import org.apache.nifi.processors.hadoop.record.HDFSRecordWriter;
+import org.apache.nifi.schema.access.SchemaAccessStrategy;
+import org.apache.nifi.schema.access.SchemaAccessUtils;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.schemaregistry.services.SchemaRegistry;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.nifi.util.StopWatch;
+
+import java.io.BufferedInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.security.PrivilegedAction;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static 
org.apache.nifi.schema.access.SchemaAccessUtils.HWX_CONTENT_ENCODED_SCHEMA;
+import static 
org.apache.nifi.schema.access.SchemaAccessUtils.HWX_SCHEMA_REF_ATTRIBUTES;
+import static 
org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY;
+import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_NAME;
+import static 
org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_NAME_PROPERTY;
+import static 
org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_REGISTRY;
+import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_TEXT;
+import static 
org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_TEXT_PROPERTY;
+
+/**
+ * Base class for processors that write Records to HDFS.
+ */
+@TriggerWhenEmpty // trigger when empty so we have a chance to perform a 
Kerberos re-login
+@DefaultSettings(yieldDuration = "100 ms") // decrease the default yield 
since we are triggering wh

[GitHub] nifi pull request #1712: NIFI-3724 - Add Put/Fetch Parquet Processors

2017-05-01 Thread alopresto
Github user alopresto commented on a diff in the pull request:

https://github.com/apache/nifi/pull/1712#discussion_r114154289
  
--- Diff: 
nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractFetchHDFSRecord.java
 ---
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hadoop;
+
+import org.apache.commons.io.input.NullInputStream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
+import org.apache.nifi.annotation.configuration.DefaultSettings;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.FlowFileAccessException;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.hadoop.record.HDFSRecordReader;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.nifi.util.StopWatch;
+
+import java.io.BufferedOutputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.URI;
+import java.security.PrivilegedAction;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Base processor for reading a data from HDFS that can be fetched into 
records.
+ */
+@TriggerWhenEmpty // trigger when empty so we have a chance to perform a 
Kerberos re-login
+@DefaultSettings(yieldDuration = "100 ms") // decrease the default yield 
since we are triggering when empty
+public abstract class AbstractFetchHDFSRecord extends 
AbstractHadoopProcessor {
+
+public static final PropertyDescriptor FILENAME = new 
PropertyDescriptor.Builder()
+.name("filename")
+.displayName("Filename")
+.description("The name of the file to retrieve")
+.required(true)
+.expressionLanguageSupported(true)
+.defaultValue("${path}/${filename}")
+
.addValidator(StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor RECORD_WRITER = new 
PropertyDescriptor.Builder()
+.name("record-writer")
+.displayName("Record Writer")
+.description("The service for writing records to the FlowFile 
content")
+.identifiesControllerService(RecordSetWriterFactory.class)
+.required(true)
+.build();
+
+public static final Relationship REL_SUCCESS = new 
Relationship.Builder()
+.name("success")
+.description("FlowFiles will be routed to this relationship 
once they have been updated with the content of the file")
+

[GitHub] nifi pull request #1712: NIFI-3724 - Add Put/Fetch Parquet Processors

2017-05-01 Thread alopresto
Github user alopresto commented on a diff in the pull request:

https://github.com/apache/nifi/pull/1712#discussion_r114153996
  
--- Diff: 
nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFSRecord.java
 ---
@@ -0,0 +1,505 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hadoop;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
+import org.apache.nifi.annotation.configuration.DefaultSettings;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.FlowFileAccessException;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.hadoop.exception.FailureException;
+import 
org.apache.nifi.processors.hadoop.exception.RecordReaderFactoryException;
+import org.apache.nifi.processors.hadoop.record.HDFSRecordWriter;
+import org.apache.nifi.schema.access.SchemaAccessStrategy;
+import org.apache.nifi.schema.access.SchemaAccessUtils;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.schemaregistry.services.SchemaRegistry;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.nifi.util.StopWatch;
+
+import java.io.BufferedInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.security.PrivilegedAction;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static 
org.apache.nifi.schema.access.SchemaAccessUtils.HWX_CONTENT_ENCODED_SCHEMA;
+import static 
org.apache.nifi.schema.access.SchemaAccessUtils.HWX_SCHEMA_REF_ATTRIBUTES;
+import static 
org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY;
+import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_NAME;
+import static 
org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_NAME_PROPERTY;
+import static 
org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_REGISTRY;
+import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_TEXT;
+import static 
org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_TEXT_PROPERTY;
+
+/**
+ * Base class for processors that write Records to HDFS.
+ */
+@TriggerWhenEmpty // trigger when empty so we have a chance to perform a 
Kerberos re-login
+@DefaultSettings(yieldDuration = "100 ms") // decrease the default yield 
since we are triggering wh

[GitHub] nifi pull request #1712: NIFI-3724 - Add Put/Fetch Parquet Processors

2017-05-01 Thread joewitt
Github user joewitt commented on a diff in the pull request:

https://github.com/apache/nifi/pull/1712#discussion_r114153914
  
--- Diff: 
nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaAccessUtils.java
 ---
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.schema.access;
+
+import org.apache.nifi.avro.AvroSchemaValidator;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.schemaregistry.services.SchemaRegistry;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+public class SchemaAccessUtils {
+
+public static final AllowableValue SCHEMA_NAME_PROPERTY = new 
AllowableValue("schema-name", "Use 'Schema Name' Property",
+"The name of the Schema to use is specified by the 'Schema 
Name' Property. The value of this property is used to lookup the Schema in the 
configured Schema Registry service.");
+public static final AllowableValue SCHEMA_TEXT_PROPERTY = new 
AllowableValue("schema-text-property", "Use 'Schema Text' Property",
+"The text of the Schema itself is specified by the 'Schema 
Text' Property. The value of this property must be a valid Avro Schema. "
++ "If Expression Language is used, the value of the 
'Schema Text' property must be valid after substituting the expressions.");
+public static final AllowableValue HWX_CONTENT_ENCODED_SCHEMA = new 
AllowableValue("hwx-content-encoded-schema", "HWX Content-Encoded Schema 
Reference",
+"The content of the FlowFile contains a reference to a schema 
in the Schema Registry service. The reference is encoded as a single byte 
indicating the 'protocol version', "
++ "followed by 8 bytes indicating the schema 
identifier, and finally 4 bytes indicating the schema version, as per the 
Hortonworks Schema Registry serializers and deserializers, "
++ "found at https://github.com/hortonworks/registry";);
+public static final AllowableValue HWX_SCHEMA_REF_ATTRIBUTES = new 
AllowableValue("hwx-schema-ref-attributes", "HWX Schema Reference Attributes",
+"The FlowFile contains 3 Attributes that will be used to 
lookup a Schema from the configured Schema Registry: 'schema.identifier', 
'schema.version', and 'schema.protocol.version'");
+
+public  static final PropertyDescriptor SCHEMA_REGISTRY = new 
PropertyDescriptor.Builder()
+.name("Schema Registry")
+.description("Specifies the Controller Service to use for the 
Schema Registry")
+.identifiesControllerService(SchemaRegistry.class)
+.required(false)
+.build();
+
+public  static final PropertyDescriptor SCHEMA_ACCESS_STRATEGY = new 
PropertyDescriptor.Builder()
+.name("Schema Access Strategy")
+.description("Specifies how to obtain the schema that is to be 
used for interpreting the data.")
+.allowableValues(SCHEMA_NAME_PROPERTY, SCHEMA_TEXT_PROPERTY, 
HWX_SCHEMA_REF_ATTRIBUTES, HWX_CONTENT_ENCODED_SCHEMA)
+.defaultValue(SCHEMA_NAME_PROPERTY.getValue())
+.required(true)
+.build();
+
+public static final PropertyDescriptor SCHEMA_NAME = new 
PropertyDescriptor.Builder()
+.name("Schema Name")
+.description("Specifies the name of the schema to lookup in 
the Schema Registry property")
--- End diff --

just to be clear the only thing that needs to be truly Human Readable is 
displayName.  However, the

[GitHub] nifi pull request #1712: NIFI-3724 - Add Put/Fetch Parquet Processors

2017-05-01 Thread alopresto
Github user alopresto commented on a diff in the pull request:

https://github.com/apache/nifi/pull/1712#discussion_r114152045
  
--- Diff: 
nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaAccessUtils.java
 ---
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.schema.access;
+
+import org.apache.nifi.avro.AvroSchemaValidator;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.schemaregistry.services.SchemaRegistry;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+public class SchemaAccessUtils {
+
+public static final AllowableValue SCHEMA_NAME_PROPERTY = new 
AllowableValue("schema-name", "Use 'Schema Name' Property",
+"The name of the Schema to use is specified by the 'Schema 
Name' Property. The value of this property is used to lookup the Schema in the 
configured Schema Registry service.");
+public static final AllowableValue SCHEMA_TEXT_PROPERTY = new 
AllowableValue("schema-text-property", "Use 'Schema Text' Property",
+"The text of the Schema itself is specified by the 'Schema 
Text' Property. The value of this property must be a valid Avro Schema. "
++ "If Expression Language is used, the value of the 
'Schema Text' property must be valid after substituting the expressions.");
+public static final AllowableValue HWX_CONTENT_ENCODED_SCHEMA = new 
AllowableValue("hwx-content-encoded-schema", "HWX Content-Encoded Schema 
Reference",
+"The content of the FlowFile contains a reference to a schema 
in the Schema Registry service. The reference is encoded as a single byte 
indicating the 'protocol version', "
++ "followed by 8 bytes indicating the schema 
identifier, and finally 4 bytes indicating the schema version, as per the 
Hortonworks Schema Registry serializers and deserializers, "
++ "found at https://github.com/hortonworks/registry";);
+public static final AllowableValue HWX_SCHEMA_REF_ATTRIBUTES = new 
AllowableValue("hwx-schema-ref-attributes", "HWX Schema Reference Attributes",
+"The FlowFile contains 3 Attributes that will be used to 
lookup a Schema from the configured Schema Registry: 'schema.identifier', 
'schema.version', and 'schema.protocol.version'");
+
+public  static final PropertyDescriptor SCHEMA_REGISTRY = new 
PropertyDescriptor.Builder()
+.name("Schema Registry")
+.description("Specifies the Controller Service to use for the 
Schema Registry")
+.identifiesControllerService(SchemaRegistry.class)
+.required(false)
+.build();
+
+public  static final PropertyDescriptor SCHEMA_ACCESS_STRATEGY = new 
PropertyDescriptor.Builder()
+.name("Schema Access Strategy")
+.description("Specifies how to obtain the schema that is to be 
used for interpreting the data.")
+.allowableValues(SCHEMA_NAME_PROPERTY, SCHEMA_TEXT_PROPERTY, 
HWX_SCHEMA_REF_ATTRIBUTES, HWX_CONTENT_ENCODED_SCHEMA)
+.defaultValue(SCHEMA_NAME_PROPERTY.getValue())
+.required(true)
+.build();
+
+public static final PropertyDescriptor SCHEMA_NAME = new 
PropertyDescriptor.Builder()
+.name("Schema Name")
+.description("Specifies the name of the schema to lookup in 
the Schema Registry property")
--- End diff --

Please use a `name` value without a space and provide a `displayName` field 
with human-facing val

[GitHub] nifi pull request #1712: NIFI-3724 - Add Put/Fetch Parquet Processors

2017-05-01 Thread alopresto
Github user alopresto commented on a diff in the pull request:

https://github.com/apache/nifi/pull/1712#discussion_r114151998
  
--- Diff: 
nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaAccessUtils.java
 ---
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.schema.access;
+
+import org.apache.nifi.avro.AvroSchemaValidator;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.schemaregistry.services.SchemaRegistry;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+public class SchemaAccessUtils {
+
+public static final AllowableValue SCHEMA_NAME_PROPERTY = new 
AllowableValue("schema-name", "Use 'Schema Name' Property",
+"The name of the Schema to use is specified by the 'Schema 
Name' Property. The value of this property is used to lookup the Schema in the 
configured Schema Registry service.");
+public static final AllowableValue SCHEMA_TEXT_PROPERTY = new 
AllowableValue("schema-text-property", "Use 'Schema Text' Property",
+"The text of the Schema itself is specified by the 'Schema 
Text' Property. The value of this property must be a valid Avro Schema. "
++ "If Expression Language is used, the value of the 
'Schema Text' property must be valid after substituting the expressions.");
+public static final AllowableValue HWX_CONTENT_ENCODED_SCHEMA = new 
AllowableValue("hwx-content-encoded-schema", "HWX Content-Encoded Schema 
Reference",
+"The content of the FlowFile contains a reference to a schema 
in the Schema Registry service. The reference is encoded as a single byte 
indicating the 'protocol version', "
++ "followed by 8 bytes indicating the schema 
identifier, and finally 4 bytes indicating the schema version, as per the 
Hortonworks Schema Registry serializers and deserializers, "
++ "found at https://github.com/hortonworks/registry";);
+public static final AllowableValue HWX_SCHEMA_REF_ATTRIBUTES = new 
AllowableValue("hwx-schema-ref-attributes", "HWX Schema Reference Attributes",
+"The FlowFile contains 3 Attributes that will be used to 
lookup a Schema from the configured Schema Registry: 'schema.identifier', 
'schema.version', and 'schema.protocol.version'");
+
+public  static final PropertyDescriptor SCHEMA_REGISTRY = new 
PropertyDescriptor.Builder()
+.name("Schema Registry")
+.description("Specifies the Controller Service to use for the 
Schema Registry")
+.identifiesControllerService(SchemaRegistry.class)
+.required(false)
+.build();
+
+public  static final PropertyDescriptor SCHEMA_ACCESS_STRATEGY = new 
PropertyDescriptor.Builder()
+.name("Schema Access Strategy")
--- End diff --

Please use a `name` value without a space and provide a `displayName` field 
with human-facing value (i.e. "Schema Access Strategy"). 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request #1712: NIFI-3724 - Add Put/Fetch Parquet Processors

2017-05-01 Thread alopresto
Github user alopresto commented on a diff in the pull request:

https://github.com/apache/nifi/pull/1712#discussion_r114151958
  
--- Diff: 
nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaAccessUtils.java
 ---
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.schema.access;
+
+import org.apache.nifi.avro.AvroSchemaValidator;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.schemaregistry.services.SchemaRegistry;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+public class SchemaAccessUtils {
+
+public static final AllowableValue SCHEMA_NAME_PROPERTY = new 
AllowableValue("schema-name", "Use 'Schema Name' Property",
+"The name of the Schema to use is specified by the 'Schema 
Name' Property. The value of this property is used to lookup the Schema in the 
configured Schema Registry service.");
+public static final AllowableValue SCHEMA_TEXT_PROPERTY = new 
AllowableValue("schema-text-property", "Use 'Schema Text' Property",
+"The text of the Schema itself is specified by the 'Schema 
Text' Property. The value of this property must be a valid Avro Schema. "
++ "If Expression Language is used, the value of the 
'Schema Text' property must be valid after substituting the expressions.");
+public static final AllowableValue HWX_CONTENT_ENCODED_SCHEMA = new 
AllowableValue("hwx-content-encoded-schema", "HWX Content-Encoded Schema 
Reference",
+"The content of the FlowFile contains a reference to a schema 
in the Schema Registry service. The reference is encoded as a single byte 
indicating the 'protocol version', "
++ "followed by 8 bytes indicating the schema 
identifier, and finally 4 bytes indicating the schema version, as per the 
Hortonworks Schema Registry serializers and deserializers, "
++ "found at https://github.com/hortonworks/registry";);
+public static final AllowableValue HWX_SCHEMA_REF_ATTRIBUTES = new 
AllowableValue("hwx-schema-ref-attributes", "HWX Schema Reference Attributes",
+"The FlowFile contains 3 Attributes that will be used to 
lookup a Schema from the configured Schema Registry: 'schema.identifier', 
'schema.version', and 'schema.protocol.version'");
+
+public  static final PropertyDescriptor SCHEMA_REGISTRY = new 
PropertyDescriptor.Builder()
+.name("Schema Registry")
--- End diff --

Please use a `name` value without a space and provide a `displayName` field 
with human-facing value (i.e. "Schema Registry"). 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request #1712: NIFI-3724 - Add Put/Fetch Parquet Processors

2017-05-01 Thread alopresto
Github user alopresto commented on a diff in the pull request:

https://github.com/apache/nifi/pull/1712#discussion_r114150872
  
--- Diff: 
nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
 ---
@@ -0,0 +1,496 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.avro;
+
+import org.apache.avro.LogicalType;
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.Schema.Type;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericFixed;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.util.Utf8;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.SchemaIdentifier;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+import 
org.apache.nifi.serialization.record.util.IllegalTypeConversionException;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+public class AvroTypeUtil {
+public static final String AVRO_SCHEMA_FORMAT = "avro";
+
+public static Schema extractAvroSchema(final RecordSchema 
recordSchema) throws SchemaNotFoundException {
+final Optional schemaFormatOption = 
recordSchema.getSchemaFormat();
+if (!schemaFormatOption.isPresent()) {
+throw new SchemaNotFoundException("No Schema Format was 
present in the RecordSchema");
+}
+
+final String schemaFormat = schemaFormatOption.get();
+if (!schemaFormat.equals(AVRO_SCHEMA_FORMAT)) {
+throw new SchemaNotFoundException("Schema provided is not in 
Avro format");
+}
+
+final Optional textOption = recordSchema.getSchemaText();
+if (!textOption.isPresent()) {
+throw new SchemaNotFoundException("No Schema text was present 
in the RecordSchema");
+}
+
+final String text = textOption.get();
+return new Schema.Parser().parse(text);
+}
+
+public static DataType determineDataType(final Schema avroSchema) {
+final Type avroType = avroSchema.getType();
--- End diff --

Same comment for `null` check. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request #1712: NIFI-3724 - Add Put/Fetch Parquet Processors

2017-05-01 Thread alopresto
Github user alopresto commented on a diff in the pull request:

https://github.com/apache/nifi/pull/1712#discussion_r114148462
  
--- Diff: 
nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
 ---
@@ -0,0 +1,496 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.avro;
+
+import org.apache.avro.LogicalType;
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.Schema.Type;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericFixed;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.util.Utf8;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.SchemaIdentifier;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+import 
org.apache.nifi.serialization.record.util.IllegalTypeConversionException;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+public class AvroTypeUtil {
+public static final String AVRO_SCHEMA_FORMAT = "avro";
+
+public static Schema extractAvroSchema(final RecordSchema 
recordSchema) throws SchemaNotFoundException {
+final Optional schemaFormatOption = 
recordSchema.getSchemaFormat();
--- End diff --

Should we guard against `null` here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request #1712: NIFI-3724 - Add Put/Fetch Parquet Processors

2017-05-01 Thread alopresto
Github user alopresto commented on a diff in the pull request:

https://github.com/apache/nifi/pull/1712#discussion_r114147981
  
--- Diff: 
nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
 ---
@@ -67,40 +61,14 @@
  */
 @RequiresInstanceClassLoading(cloneAncestorResources = true)
 public abstract class AbstractHadoopProcessor extends AbstractProcessor {
-/**
- * Compression Type Enum
- */
-public enum CompressionType {
-NONE,
-DEFAULT,
-BZIP,
-GZIP,
-LZ4,
-SNAPPY,
-AUTOMATIC;
-
-@Override
-public String toString() {
-switch (this) {
-case NONE: return "NONE";
-case DEFAULT: return DefaultCodec.class.getName();
-case BZIP: return BZip2Codec.class.getName();
-case GZIP: return GzipCodec.class.getName();
-case LZ4: return Lz4Codec.class.getName();
-case SNAPPY: return SnappyCodec.class.getName();
-case AUTOMATIC: return "Automatically Detected";
-}
-return null;
-}
-}
 
 // properties
 public static final PropertyDescriptor HADOOP_CONFIGURATION_RESOURCES 
= new PropertyDescriptor.Builder()
 .name("Hadoop Configuration Resources")
 .description("A file or comma separated list of files which 
contains the Hadoop file system configuration. Without this, Hadoop "
 + "will search the classpath for a 'core-site.xml' and 
'hdfs-site.xml' file or will revert to a default configuration.")
 .required(false)
-.addValidator(createMultipleFilesExistValidator())
+.addValidator(HadoopValidators.MULTIPLE_FILE_EXISTS_VALIDATOR)
--- End diff --

Minor comment -- until I read the source code for this, my interpretation 
was that this validator ensured that *multiple files existed* -- i.e. one file 
provided would fail. Perhaps we can rename this 
`ONE_OR_MORE_FILES_EXIST_VALIDATOR`? Not a giant issue but potentially 
confusing. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request #1712: NIFI-3724 - Add Put/Fetch Parquet Processors

2017-04-27 Thread bbende
GitHub user bbende opened a pull request:

https://github.com/apache/nifi/pull/1712

NIFI-3724 - Add Put/Fetch Parquet Processors

This PR adds a new nifi-parquet-bundle with PutParquet and FetchParquet 
processors. These work similar to PutHDFS and FetchHDFS, but instead read and 
write Records.

While working on this I needed to reuse portions of the record 
reader/writer code, and thus refactored some of the project structure which 
caused many files to move around.

Summary of changes:
- Created nifi-parquet-bundle
- Created nifi-commons/nifi-record to hold domain/API related to records
- Created nifi-nar-bundles/nifi-extension-utils as a place for utility code 
specific to extensions
- Moved nifi-commons/nifi-processor-utils under nifi-extension-utils
- Moved nifi-commons/nifi-hadoop-utils under nifi-extension-utils
- Create nifi-extension-utils/nifi-record-utils for utility code related 
records

To test the Parquet processors you can create a core-site.xml with a local 
file system and read/write parquet to local directories:

```


fs.defaultFS
file:///


```


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/bbende/nifi parquet-bundle

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/nifi/pull/1712.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1712


commit a35e5957f5ff8c47df5352b7b1a5ef494fed8633
Author: Bryan Bende 
Date:   2017-04-12T22:25:31Z

NIFI-3724 - Initial commit of Parquet bundle with PutParquet and 
FetchParquet
- Creating nifi-records-utils to share utility code from record services
- Refactoring Parquet tests to use MockRecorderParser and MockRecordWriter
- Refactoring AbstractPutHDFSRecord to use schema access strategy
- Adding custom validate to AbstractPutHDFSRecord and adding handling of 
UNION types when writing Records as Avro
- Refactoring project structure to get CS API references out of 
nifi-commons, introducing nifi-extension-utils under nifi-nar-bundles




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---