http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/2d1e43e7/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/schema/SecurityPropertiesSchemaTest.java ---------------------------------------------------------------------- diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/schema/SecurityPropertiesSchemaTest.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/schema/SecurityPropertiesSchemaTest.java deleted file mode 100644 index ee889d2..0000000 --- a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/schema/SecurityPropertiesSchemaTest.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.nifi.minifi.bootstrap.util.schema; - -import org.junit.Before; -import org.junit.Test; - -import java.util.HashMap; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -public class SecurityPropertiesSchemaTest { - private SecurityPropertiesSchema securityPropertiesSchema; - - @Before - public void setup() { - securityPropertiesSchema = new SecurityPropertiesSchema(new HashMap()); - } - - @Test - public void testKeystoreDefault() { - assertEquals("", securityPropertiesSchema.getKeystore()); - } - - @Test - public void testTruststoreDefault() { - assertEquals("", securityPropertiesSchema.getTruststore()); - } - - @Test - public void testSslProtocolDefault() { - assertEquals("", securityPropertiesSchema.getSslProtocol()); - } - - @Test - public void testKeystoreTypeDefault() { - assertEquals("", securityPropertiesSchema.getKeystoreType()); - } - - @Test - public void testKeyStorePasswdDefault() { - assertEquals("", securityPropertiesSchema.getKeystorePassword()); - } - - @Test - public void testKeyPasswordDefault() { - assertEquals("", securityPropertiesSchema.getKeyPassword()); - } - - @Test - public void testTruststoreTypeDefault() { - assertEquals("", securityPropertiesSchema.getTruststoreType()); - } - - @Test - public void testTruststorePasswdDefault() { - assertEquals("", securityPropertiesSchema.getTruststorePassword()); - } - - @Test - public void testEmptyMapConstructorValid() { - assertTrue(securityPropertiesSchema.isValid()); - } -}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/2d1e43e7/minifi-commons/minifi-commons-schema/pom.xml ---------------------------------------------------------------------- diff --git a/minifi-commons/minifi-commons-schema/pom.xml b/minifi-commons/minifi-commons-schema/pom.xml new file mode 100644 index 0000000..c2310dd --- /dev/null +++ b/minifi-commons/minifi-commons-schema/pom.xml @@ -0,0 +1,38 @@ +<?xml version="1.0"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.nifi.minifi</groupId> + <artifactId>minifi-commons</artifactId> + <version>0.0.1-SNAPSHOT</version> + </parent> + + <artifactId>minifi-commons-schema</artifactId> + <packaging>jar</packaging> + + <dependencies> + <dependency> + <groupId>org.yaml</groupId> + <artifactId>snakeyaml</artifactId> + <version>1.17</version> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-api</artifactId> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/2d1e43e7/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ComponentStatusRepositorySchema.java ---------------------------------------------------------------------- diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ComponentStatusRepositorySchema.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ComponentStatusRepositorySchema.java new file mode 100644 index 0000000..02f3a78 --- /dev/null +++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ComponentStatusRepositorySchema.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.minifi.commons.schema; + +import org.apache.nifi.minifi.commons.schema.common.BaseSchema; + +import java.util.Map; + +import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.COMPONENT_STATUS_REPO_KEY; + +public class ComponentStatusRepositorySchema extends BaseSchema { + public static final String BUFFER_SIZE_KEY = "buffer size"; + public static final String SNAPSHOT_FREQUENCY_KEY = "snapshot frequency"; + + public static final int DEFAULT_BUFFER_SIZE = 1440; + public static final String DEFAULT_SNAPSHOT_FREQUENCY = "1 min"; + + private Number bufferSize = DEFAULT_BUFFER_SIZE; + private String snapshotFrequency = DEFAULT_SNAPSHOT_FREQUENCY; + + public ComponentStatusRepositorySchema() { + } + + public ComponentStatusRepositorySchema(Map map) { + bufferSize = getOptionalKeyAsType(map, BUFFER_SIZE_KEY, Number.class, COMPONENT_STATUS_REPO_KEY, DEFAULT_BUFFER_SIZE); + snapshotFrequency = getOptionalKeyAsType(map, SNAPSHOT_FREQUENCY_KEY, String.class, COMPONENT_STATUS_REPO_KEY, DEFAULT_SNAPSHOT_FREQUENCY); + } + + @Override + public Map<String, Object> toMap() { + Map<String, Object> result = mapSupplier.get(); + result.put(BUFFER_SIZE_KEY, bufferSize); + result.put(SNAPSHOT_FREQUENCY_KEY, snapshotFrequency); + return result; + } + + public Number getBufferSize() { + return bufferSize; + } + + public String getSnapshotFrequency() { + return snapshotFrequency; + } +} http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/2d1e43e7/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ConfigSchema.java ---------------------------------------------------------------------- diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ConfigSchema.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ConfigSchema.java new file mode 100644 index 0000000..57f9124 --- /dev/null +++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ConfigSchema.java @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.minifi.commons.schema; + +import org.apache.nifi.minifi.commons.schema.common.BaseSchema; + +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; +import java.util.stream.Collectors; + +import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.COMPONENT_STATUS_REPO_KEY; +import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.CONNECTIONS_KEY; +import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.CONTENT_REPO_KEY; +import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.CORE_PROPS_KEY; +import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.FLOWFILE_REPO_KEY; +import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.FLOW_CONTROLLER_PROPS_KEY; +import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.PROCESSORS_KEY; +import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.PROVENANCE_REPORTING_KEY; +import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.PROVENANCE_REPO_KEY; +import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.REMOTE_PROCESSING_GROUPS_KEY; +import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.SECURITY_PROPS_KEY; + +/** + * + */ +public class ConfigSchema extends BaseSchema { + public static final String FOUND_THE_FOLLOWING_DUPLICATE_PROCESSOR_NAMES = "Found the following duplicate processor names: "; + public static final String FOUND_THE_FOLLOWING_DUPLICATE_CONNECTION_NAMES = "Found the following duplicate connection names: "; + public static final String FOUND_THE_FOLLOWING_DUPLICATE_REMOTE_PROCESSING_GROUP_NAMES = "Found the following duplicate remote processing group names: "; + public static String TOP_LEVEL_NAME = "top level"; + + private FlowControllerSchema flowControllerProperties; + private CorePropertiesSchema coreProperties; + private FlowFileRepositorySchema flowfileRepositoryProperties; + private ContentRepositorySchema contentRepositoryProperties; + private ComponentStatusRepositorySchema componentStatusRepositoryProperties; + private SecurityPropertiesSchema securityProperties; + private List<ProcessorSchema> processors; + private List<ConnectionSchema> connections; + private List<RemoteProcessingGroupSchema> remoteProcessingGroups; + private ProvenanceReportingSchema provenanceReportingProperties; + + private ProvenanceRepositorySchema provenanceRepositorySchema; + + public ConfigSchema(Map map) { + flowControllerProperties = getMapAsType(map, FLOW_CONTROLLER_PROPS_KEY, FlowControllerSchema.class, TOP_LEVEL_NAME, true); + + coreProperties = getMapAsType(map, CORE_PROPS_KEY, CorePropertiesSchema.class, TOP_LEVEL_NAME, false); + flowfileRepositoryProperties = getMapAsType(map, FLOWFILE_REPO_KEY, FlowFileRepositorySchema.class, TOP_LEVEL_NAME, false); + contentRepositoryProperties = getMapAsType(map, CONTENT_REPO_KEY, ContentRepositorySchema.class, TOP_LEVEL_NAME, false); + provenanceRepositorySchema = getMapAsType(map, PROVENANCE_REPO_KEY, ProvenanceRepositorySchema.class, TOP_LEVEL_NAME, false); + componentStatusRepositoryProperties = getMapAsType(map, COMPONENT_STATUS_REPO_KEY, ComponentStatusRepositorySchema.class, TOP_LEVEL_NAME, false); + securityProperties = getMapAsType(map, SECURITY_PROPS_KEY, SecurityPropertiesSchema.class, TOP_LEVEL_NAME, false); + + processors = getOptionalKeyAsType(map, PROCESSORS_KEY, List.class, TOP_LEVEL_NAME, null); + if (processors != null) { + transformListToType(processors, "processor", ProcessorSchema.class, PROCESSORS_KEY); + } + + connections = getOptionalKeyAsType(map, CONNECTIONS_KEY, List.class, TOP_LEVEL_NAME, null); + if (connections != null) { + transformListToType(connections, "connection", ConnectionSchema.class, CONNECTIONS_KEY); + } + + remoteProcessingGroups = getOptionalKeyAsType(map, REMOTE_PROCESSING_GROUPS_KEY, List.class, TOP_LEVEL_NAME, null); + if (remoteProcessingGroups != null) { + transformListToType(remoteProcessingGroups, "remote processing group", RemoteProcessingGroupSchema.class, REMOTE_PROCESSING_GROUPS_KEY); + } + + provenanceReportingProperties = getMapAsType(map, PROVENANCE_REPORTING_KEY, ProvenanceReportingSchema.class, TOP_LEVEL_NAME, false, false); + + addIssuesIfNotNull(flowControllerProperties); + addIssuesIfNotNull(coreProperties); + addIssuesIfNotNull(flowfileRepositoryProperties); + addIssuesIfNotNull(contentRepositoryProperties); + addIssuesIfNotNull(componentStatusRepositoryProperties); + addIssuesIfNotNull(securityProperties); + addIssuesIfNotNull(provenanceReportingProperties); + addIssuesIfNotNull(provenanceRepositorySchema); + + if (processors != null) { + checkForDuplicateNames(FOUND_THE_FOLLOWING_DUPLICATE_PROCESSOR_NAMES, processors.stream().map(ProcessorSchema::getName).collect(Collectors.toList())); + for (ProcessorSchema processorSchema : processors) { + addIssuesIfNotNull(processorSchema); + } + } + + if (connections != null) { + checkForDuplicateNames(FOUND_THE_FOLLOWING_DUPLICATE_CONNECTION_NAMES, connections.stream().map(ConnectionSchema::getName).collect(Collectors.toList())); + for (ConnectionSchema connectionSchema : connections) { + addIssuesIfNotNull(connectionSchema); + } + } + + if (remoteProcessingGroups != null) { + checkForDuplicateNames(FOUND_THE_FOLLOWING_DUPLICATE_REMOTE_PROCESSING_GROUP_NAMES, remoteProcessingGroups.stream().map(RemoteProcessingGroupSchema::getName).collect(Collectors.toList())); + for (RemoteProcessingGroupSchema remoteProcessingGroupSchema : remoteProcessingGroups) { + addIssuesIfNotNull(remoteProcessingGroupSchema); + } + } + } + + private void checkForDuplicateNames(String errorMessagePrefix, List<String> names) { + if (processors != null) { + Set<String> seenNames = new HashSet<>(); + Set<String> duplicateNames = new TreeSet<>(); + for (String name : names) { + if (!seenNames.add(name)) { + duplicateNames.add(name); + } + } + if (duplicateNames.size() > 0) { + StringBuilder errorMessage = new StringBuilder(errorMessagePrefix); + for (String duplicateName : duplicateNames) { + errorMessage.append(duplicateName); + errorMessage.append(", "); + } + errorMessage.setLength(errorMessage.length() - 2); + validationIssues.add(errorMessage.toString()); + } + } + } + + public Map<String, Object> toMap() { + Map<String, Object> result = mapSupplier.get(); + result.put(FLOW_CONTROLLER_PROPS_KEY, flowControllerProperties.toMap()); + putIfNotNull(result, CORE_PROPS_KEY, coreProperties); + putIfNotNull(result, FLOWFILE_REPO_KEY, flowfileRepositoryProperties); + putIfNotNull(result, CONTENT_REPO_KEY, contentRepositoryProperties); + putIfNotNull(result, PROVENANCE_REPO_KEY, provenanceRepositorySchema); + putIfNotNull(result, COMPONENT_STATUS_REPO_KEY, componentStatusRepositoryProperties); + putIfNotNull(result, SECURITY_PROPS_KEY, securityProperties); + putListIfNotNull(result, PROCESSORS_KEY, processors); + putListIfNotNull(result, CONNECTIONS_KEY, connections); + putListIfNotNull(result, REMOTE_PROCESSING_GROUPS_KEY, remoteProcessingGroups); + putIfNotNull(result, PROVENANCE_REPORTING_KEY, provenanceReportingProperties); + return result; + } + + public FlowControllerSchema getFlowControllerProperties() { + return flowControllerProperties; + } + + public CorePropertiesSchema getCoreProperties() { + return coreProperties; + } + + public FlowFileRepositorySchema getFlowfileRepositoryProperties() { + return flowfileRepositoryProperties; + } + + public ContentRepositorySchema getContentRepositoryProperties() { + return contentRepositoryProperties; + } + + public SecurityPropertiesSchema getSecurityProperties() { + return securityProperties; + } + + public List<ProcessorSchema> getProcessors() { + return processors; + } + + public List<ConnectionSchema> getConnections() { + return connections; + } + + public List<RemoteProcessingGroupSchema> getRemoteProcessingGroups() { + return remoteProcessingGroups; + } + + public ProvenanceReportingSchema getProvenanceReportingProperties() { + return provenanceReportingProperties; + } + + public ComponentStatusRepositorySchema getComponentStatusRepositoryProperties() { + return componentStatusRepositoryProperties; + } + + public ProvenanceRepositorySchema getProvenanceRepositorySchema() { + return provenanceRepositorySchema; + } +} http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/2d1e43e7/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ConnectionSchema.java ---------------------------------------------------------------------- diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ConnectionSchema.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ConnectionSchema.java new file mode 100644 index 0000000..34bd61f --- /dev/null +++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ConnectionSchema.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.minifi.commons.schema; + +import org.apache.nifi.minifi.commons.schema.common.BaseSchema; + +import java.util.Map; + +import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.CONNECTIONS_KEY; +import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.NAME_KEY; + +/** + * + */ +public class ConnectionSchema extends BaseSchema { + public static final String SOURCE_NAME_KEY = "source name"; + public static final String SOURCE_RELATIONSHIP_NAME_KEY = "source relationship name"; + public static final String DESTINATION_NAME_KEY = "destination name"; + public static final String MAX_WORK_QUEUE_SIZE_KEY = "max work queue size"; + public static final String MAX_WORK_QUEUE_DATA_SIZE_KEY = "max work queue data size"; + public static final String FLOWFILE_EXPIRATION__KEY = "flowfile expiration"; + public static final String QUEUE_PRIORITIZER_CLASS_KEY = "queue prioritizer class"; + + public static final long DEFAULT_MAX_WORK_QUEUE_SIZE = 0; + public static final String DEFAULT_MAX_QUEUE_DATA_SIZE = "0 MB"; + public static final String DEFAULT_FLOWFILE_EXPIRATION = "0 sec"; + + private String name; + private String sourceName; + private String sourceRelationshipName; + private String destinationName; + + private Number maxWorkQueueSize = DEFAULT_MAX_WORK_QUEUE_SIZE; + private String maxWorkQueueDataSize = DEFAULT_MAX_QUEUE_DATA_SIZE; + private String flowfileExpiration = DEFAULT_FLOWFILE_EXPIRATION; + private String queuePrioritizerClass; + + public ConnectionSchema(Map map) { + name = getRequiredKeyAsType(map, NAME_KEY, String.class, CONNECTIONS_KEY); + sourceName = getRequiredKeyAsType(map, SOURCE_NAME_KEY, String.class, CONNECTIONS_KEY); + sourceRelationshipName = getRequiredKeyAsType(map, SOURCE_RELATIONSHIP_NAME_KEY, String.class, CONNECTIONS_KEY); + destinationName = getRequiredKeyAsType(map, DESTINATION_NAME_KEY, String.class, CONNECTIONS_KEY); + + maxWorkQueueSize = getOptionalKeyAsType(map, MAX_WORK_QUEUE_SIZE_KEY, Number.class, CONNECTIONS_KEY, DEFAULT_MAX_WORK_QUEUE_SIZE); + maxWorkQueueDataSize = getOptionalKeyAsType(map, MAX_WORK_QUEUE_DATA_SIZE_KEY, String.class, CONNECTIONS_KEY, DEFAULT_MAX_QUEUE_DATA_SIZE); + flowfileExpiration = getOptionalKeyAsType(map, FLOWFILE_EXPIRATION__KEY, String.class, CONNECTIONS_KEY, DEFAULT_FLOWFILE_EXPIRATION); + queuePrioritizerClass = getOptionalKeyAsType(map, QUEUE_PRIORITIZER_CLASS_KEY, String.class, CONNECTIONS_KEY, ""); + } + + @Override + public Map<String, Object> toMap() { + Map<String, Object> result = mapSupplier.get(); + result.put(NAME_KEY, name); + result.put(SOURCE_NAME_KEY, sourceName); + result.put(SOURCE_RELATIONSHIP_NAME_KEY, sourceRelationshipName); + result.put(DESTINATION_NAME_KEY, destinationName); + + result.put(MAX_WORK_QUEUE_SIZE_KEY, maxWorkQueueSize); + result.put(MAX_WORK_QUEUE_DATA_SIZE_KEY, maxWorkQueueDataSize); + result.put(FLOWFILE_EXPIRATION__KEY, flowfileExpiration); + result.put(QUEUE_PRIORITIZER_CLASS_KEY, queuePrioritizerClass); + return result; + } + + public String getName() { + return name; + } + + public String getSourceName() { + return sourceName; + } + + public String getSourceRelationshipName() { + return sourceRelationshipName; + } + + public String getDestinationName() { + return destinationName; + } + + public Number getMaxWorkQueueSize() { + return maxWorkQueueSize; + } + + public String getMaxWorkQueueDataSize() { + return maxWorkQueueDataSize; + } + + public String getFlowfileExpiration() { + return flowfileExpiration; + } + + public String getQueuePrioritizerClass() { + return queuePrioritizerClass; + } +} http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/2d1e43e7/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ContentRepositorySchema.java ---------------------------------------------------------------------- diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ContentRepositorySchema.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ContentRepositorySchema.java new file mode 100644 index 0000000..868cb79 --- /dev/null +++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ContentRepositorySchema.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.minifi.commons.schema; + +import org.apache.nifi.minifi.commons.schema.common.BaseSchema; + +import java.util.Map; + +import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.ALWAYS_SYNC_KEY; +import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.CONTENT_REPO_KEY; + +/** + * + */ +public class ContentRepositorySchema extends BaseSchema { + public static final String CONTENT_CLAIM_MAX_APPENDABLE_SIZE_KEY = "content claim max appendable size"; + public static final String CONTENT_CLAIM_MAX_FLOW_FILES_KEY = "content claim max flow files"; + + public static final String DEFAULT_CONTENT_CLAIM_MAX_APPENDABLE_SIZE = "10 MB"; + public static final int DEFAULT_CONTENT_CLAIM_MAX_FLOW_FILES = 100; + public static final boolean DEFAULT_ALWAYS_SYNC = false; + + private String contentClaimMaxAppendableSize = DEFAULT_CONTENT_CLAIM_MAX_APPENDABLE_SIZE; + private Number contentClaimMaxFlowFiles = DEFAULT_CONTENT_CLAIM_MAX_FLOW_FILES; + private Boolean alwaysSync = DEFAULT_ALWAYS_SYNC; + + public ContentRepositorySchema() { + } + + public ContentRepositorySchema(Map map) { + contentClaimMaxAppendableSize = getOptionalKeyAsType(map, CONTENT_CLAIM_MAX_APPENDABLE_SIZE_KEY, String.class, + CONTENT_REPO_KEY, DEFAULT_CONTENT_CLAIM_MAX_APPENDABLE_SIZE); + contentClaimMaxFlowFiles = getOptionalKeyAsType(map, CONTENT_CLAIM_MAX_FLOW_FILES_KEY, Number.class, + CONTENT_REPO_KEY, DEFAULT_CONTENT_CLAIM_MAX_FLOW_FILES); + alwaysSync = getOptionalKeyAsType(map, ALWAYS_SYNC_KEY, Boolean.class, CONTENT_REPO_KEY, DEFAULT_ALWAYS_SYNC); + } + + @Override + public Map<String, Object> toMap() { + Map<String, Object> result = mapSupplier.get(); + result.put(CONTENT_CLAIM_MAX_APPENDABLE_SIZE_KEY, contentClaimMaxAppendableSize); + result.put(CONTENT_CLAIM_MAX_FLOW_FILES_KEY, contentClaimMaxFlowFiles); + result.put(ALWAYS_SYNC_KEY, alwaysSync); + return result; + } + + public String getContentClaimMaxAppendableSize() { + return contentClaimMaxAppendableSize; + } + + public Number getContentClaimMaxFlowFiles() { + return contentClaimMaxFlowFiles; + } + + public boolean getAlwaysSync() { + return alwaysSync; + } +} http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/2d1e43e7/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/CorePropertiesSchema.java ---------------------------------------------------------------------- diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/CorePropertiesSchema.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/CorePropertiesSchema.java new file mode 100644 index 0000000..ce30d9c --- /dev/null +++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/CorePropertiesSchema.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.minifi.commons.schema; + +import org.apache.nifi.minifi.commons.schema.common.BaseSchema; + +import java.util.Map; + +import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.CORE_PROPS_KEY; +import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.MAX_CONCURRENT_THREADS_KEY; + +/** + * + */ +public class CorePropertiesSchema extends BaseSchema { + + public static final String FLOW_CONTROLLER_SHUTDOWN_PERIOD_KEY = "flow controller graceful shutdown period"; + public static final String FLOW_SERVICE_WRITE_DELAY_INTERVAL_KEY = "flow service write delay interval"; + public static final String ADMINISTRATIVE_YIELD_DURATION_KEY = "administrative yield duration"; + public static final String BORED_YIELD_DURATION_KEY = "bored yield duration"; + + public static final String DEFAULT_FLOW_CONTROLLER_GRACEFUL_SHUTDOWN_PERIOD = "10 sec"; + public static final String DEFAULT_FLOW_SERVICE_WRITE_DELAY_INTERVAL = "500 ms"; + public static final String DEFAULT_ADMINISTRATIVE_YIELD_DURATION = "30 sec"; + public static final String DEFAULT_BORED_YIELD_DURATION = "10 millis"; + public static final int DEFAULT_MAX_CONCURRENT_THREADS = 1; + + private String flowControllerGracefulShutdownPeriod = DEFAULT_FLOW_CONTROLLER_GRACEFUL_SHUTDOWN_PERIOD; + private String flowServiceWriteDelayInterval = DEFAULT_FLOW_SERVICE_WRITE_DELAY_INTERVAL; + private String administrativeYieldDuration = DEFAULT_ADMINISTRATIVE_YIELD_DURATION; + private String boredYieldDuration = DEFAULT_BORED_YIELD_DURATION; + private Number maxConcurrentThreads = DEFAULT_MAX_CONCURRENT_THREADS; + + public CorePropertiesSchema() { + } + + public CorePropertiesSchema(Map map) { + flowControllerGracefulShutdownPeriod = getOptionalKeyAsType(map, FLOW_CONTROLLER_SHUTDOWN_PERIOD_KEY, String.class, + CORE_PROPS_KEY, DEFAULT_FLOW_CONTROLLER_GRACEFUL_SHUTDOWN_PERIOD); + flowServiceWriteDelayInterval = getOptionalKeyAsType(map, FLOW_SERVICE_WRITE_DELAY_INTERVAL_KEY, String.class, + CORE_PROPS_KEY, DEFAULT_FLOW_SERVICE_WRITE_DELAY_INTERVAL); + administrativeYieldDuration = getOptionalKeyAsType(map, ADMINISTRATIVE_YIELD_DURATION_KEY, String.class, + CORE_PROPS_KEY, DEFAULT_ADMINISTRATIVE_YIELD_DURATION); + boredYieldDuration = getOptionalKeyAsType(map, BORED_YIELD_DURATION_KEY, String.class, CORE_PROPS_KEY, DEFAULT_BORED_YIELD_DURATION); + maxConcurrentThreads = getOptionalKeyAsType(map, MAX_CONCURRENT_THREADS_KEY, Number.class, + CORE_PROPS_KEY, DEFAULT_MAX_CONCURRENT_THREADS); + } + + @Override + public Map<String, Object> toMap() { + Map<String, Object> result = mapSupplier.get(); + result.put(FLOW_CONTROLLER_SHUTDOWN_PERIOD_KEY, flowControllerGracefulShutdownPeriod); + result.put(FLOW_SERVICE_WRITE_DELAY_INTERVAL_KEY, flowServiceWriteDelayInterval); + result.put(ADMINISTRATIVE_YIELD_DURATION_KEY, administrativeYieldDuration); + result.put(BORED_YIELD_DURATION_KEY, boredYieldDuration); + result.put(MAX_CONCURRENT_THREADS_KEY, maxConcurrentThreads); + return result; + } + + public String getFlowControllerGracefulShutdownPeriod() { + return flowControllerGracefulShutdownPeriod; + } + + public String getFlowServiceWriteDelayInterval() { + return flowServiceWriteDelayInterval; + } + + public String getAdministrativeYieldDuration() { + return administrativeYieldDuration; + } + + public String getBoredYieldDuration() { + return boredYieldDuration; + } + + public Number getMaxConcurrentThreads() { + return maxConcurrentThreads; + } +} http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/2d1e43e7/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/FlowControllerSchema.java ---------------------------------------------------------------------- diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/FlowControllerSchema.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/FlowControllerSchema.java new file mode 100644 index 0000000..3306029 --- /dev/null +++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/FlowControllerSchema.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.minifi.commons.schema; + +import org.apache.nifi.minifi.commons.schema.common.BaseSchema; + +import java.util.Map; + +import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.COMMENT_KEY; +import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.FLOW_CONTROLLER_PROPS_KEY; +import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.NAME_KEY; + +/** + * + */ +public class FlowControllerSchema extends BaseSchema { + private String name; + private String comment; + + public FlowControllerSchema(Map map) { + name = getRequiredKeyAsType(map, NAME_KEY, String.class, FLOW_CONTROLLER_PROPS_KEY); + comment = getOptionalKeyAsType(map, COMMENT_KEY, String.class, FLOW_CONTROLLER_PROPS_KEY, ""); + } + + @Override + public Map<String, Object> toMap() { + Map<String, Object> result = mapSupplier.get(); + result.put(NAME_KEY, name); + result.put(COMMENT_KEY, comment); + return result; + } + + public String getName() { + return name; + } + + public String getComment() { + return comment; + } +} http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/2d1e43e7/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/FlowFileRepositorySchema.java ---------------------------------------------------------------------- diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/FlowFileRepositorySchema.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/FlowFileRepositorySchema.java new file mode 100644 index 0000000..cd7f456 --- /dev/null +++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/FlowFileRepositorySchema.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.minifi.commons.schema; + +import org.apache.nifi.minifi.commons.schema.common.BaseSchema; + +import java.util.Map; + +import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.ALWAYS_SYNC_KEY; +import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.FLOWFILE_REPO_KEY; +import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.SWAP_PROPS_KEY; + +/** + * + */ +public class FlowFileRepositorySchema extends BaseSchema { + public static final String PARTITIONS_KEY = "partitions"; + public static final String CHECKPOINT_INTERVAL_KEY = "checkpoint interval"; + public static final int DEFAULT_PARTITIONS = 256; + public static final String DEFAULT_CHECKPOINT_INTERVAL = "2 mins"; + public static final boolean DEFAULT_ALWAYS_SYNC = false; + + private Number partitions = DEFAULT_PARTITIONS; + private String checkpointInterval = DEFAULT_CHECKPOINT_INTERVAL; + private Boolean alwaysSync = DEFAULT_ALWAYS_SYNC; + private SwapSchema swapProperties; + + public FlowFileRepositorySchema() { + swapProperties = new SwapSchema(); + } + + public FlowFileRepositorySchema(Map map) { + partitions = getOptionalKeyAsType(map, PARTITIONS_KEY, Number.class, FLOWFILE_REPO_KEY, DEFAULT_PARTITIONS); + checkpointInterval = getOptionalKeyAsType(map, CHECKPOINT_INTERVAL_KEY, String.class, FLOWFILE_REPO_KEY, DEFAULT_CHECKPOINT_INTERVAL); + alwaysSync = getOptionalKeyAsType(map, ALWAYS_SYNC_KEY, Boolean.class, FLOWFILE_REPO_KEY, DEFAULT_ALWAYS_SYNC); + + swapProperties = getMapAsType(map, SWAP_PROPS_KEY, SwapSchema.class, FLOWFILE_REPO_KEY, false); + addIssuesIfNotNull(swapProperties); + } + + @Override + public Map<String, Object> toMap() { + Map<String, Object> result = mapSupplier.get(); + result.put(PARTITIONS_KEY, partitions); + result.put(CHECKPOINT_INTERVAL_KEY, checkpointInterval); + result.put(ALWAYS_SYNC_KEY, alwaysSync); + putIfNotNull(result, SWAP_PROPS_KEY, swapProperties); + return result; + } + + public Number getPartitions() { + return partitions; + } + + public String getCheckpointInterval() { + return checkpointInterval; + } + + public boolean getAlwaysSync() { + return alwaysSync; + } + + public SwapSchema getSwapProperties() { + return swapProperties; + } +} http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/2d1e43e7/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ProcessorSchema.java ---------------------------------------------------------------------- diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ProcessorSchema.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ProcessorSchema.java new file mode 100644 index 0000000..d008df5 --- /dev/null +++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ProcessorSchema.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.minifi.commons.schema; + +import org.apache.nifi.minifi.commons.schema.common.BaseSchema; +import org.apache.nifi.scheduling.SchedulingStrategy; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.MAX_CONCURRENT_TASKS_KEY; +import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.NAME_KEY; +import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.PROCESSORS_KEY; +import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.SCHEDULING_PERIOD_KEY; +import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.SCHEDULING_STRATEGY_KEY; +import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.YIELD_PERIOD_KEY; + +/** + * + */ +public class ProcessorSchema extends BaseSchema { + public static final String CLASS_KEY = "class"; + public static final String PENALIZATION_PERIOD_KEY = "penalization period"; + public static final String RUN_DURATION_NANOS_KEY = "run duration nanos"; + public static final String AUTO_TERMINATED_RELATIONSHIPS_LIST_KEY = "auto-terminated relationships list"; + public static final String PROCESSOR_PROPS_KEY = "Properties"; + public static final int DEFAULT_MAX_CONCURRENT_TASKS = 1; + public static final String DEFAULT_PENALIZATION_PERIOD = "30 sec"; + public static final String DEFAULT_YIELD_DURATION = "1 sec"; + public static final long DEFAULT_RUN_DURATION_NANOS = 0; + public static final List<String> DEFAULT_AUTO_TERMINATED_RELATIONSHIPS_LIST = Collections.emptyList(); + public static final Map<String, Object> DEFAULT_PROPERTIES = Collections.emptyMap(); + public static final String IT_IS_NOT_A_VALID_SCHEDULING_STRATEGY = "it is not a valid scheduling strategy"; + + private String name; + private String processorClass; + private String schedulingStrategy; + private String schedulingPeriod; + private Number maxConcurrentTasks = DEFAULT_MAX_CONCURRENT_TASKS; + private String penalizationPeriod = DEFAULT_PENALIZATION_PERIOD; + private String yieldPeriod = DEFAULT_YIELD_DURATION; + private Number runDurationNanos = DEFAULT_RUN_DURATION_NANOS; + private List<String> autoTerminatedRelationshipsList = DEFAULT_AUTO_TERMINATED_RELATIONSHIPS_LIST; + private Map<String, Object> properties = DEFAULT_PROPERTIES; + + public ProcessorSchema(Map map) { + name = getRequiredKeyAsType(map, NAME_KEY, String.class, PROCESSORS_KEY); + processorClass = getRequiredKeyAsType(map, CLASS_KEY, String.class, PROCESSORS_KEY); + schedulingStrategy = getRequiredKeyAsType(map, SCHEDULING_STRATEGY_KEY, String.class, PROCESSORS_KEY); + if (schedulingStrategy != null && !isSchedulingStrategy(schedulingStrategy)) { + addValidationIssue(SCHEDULING_STRATEGY_KEY, PROCESSORS_KEY, IT_IS_NOT_A_VALID_SCHEDULING_STRATEGY); + } + schedulingPeriod = getRequiredKeyAsType(map, SCHEDULING_PERIOD_KEY, String.class, PROCESSORS_KEY); + + maxConcurrentTasks = getOptionalKeyAsType(map, MAX_CONCURRENT_TASKS_KEY, Number.class, PROCESSORS_KEY, DEFAULT_MAX_CONCURRENT_TASKS); + penalizationPeriod = getOptionalKeyAsType(map, PENALIZATION_PERIOD_KEY, String.class, PROCESSORS_KEY, DEFAULT_PENALIZATION_PERIOD); + yieldPeriod = getOptionalKeyAsType(map, YIELD_PERIOD_KEY, String.class, PROCESSORS_KEY, DEFAULT_YIELD_DURATION); + runDurationNanos = getOptionalKeyAsType(map, RUN_DURATION_NANOS_KEY, Number.class, PROCESSORS_KEY, DEFAULT_RUN_DURATION_NANOS); + autoTerminatedRelationshipsList = getOptionalKeyAsType(map, AUTO_TERMINATED_RELATIONSHIPS_LIST_KEY, List.class, PROCESSORS_KEY, DEFAULT_AUTO_TERMINATED_RELATIONSHIPS_LIST); + properties = getOptionalKeyAsType(map, PROCESSOR_PROPS_KEY, Map.class, PROCESSORS_KEY, DEFAULT_PROPERTIES); + } + + private static boolean isSchedulingStrategy(String string) { + try { + SchedulingStrategy.valueOf(string); + } catch (Exception e) { + return false; + } + return true; + } + + @Override + public Map<String, Object> toMap() { + Map<String, Object> result = mapSupplier.get(); + result.put(NAME_KEY, name); + result.put(CLASS_KEY, processorClass); + result.put(MAX_CONCURRENT_TASKS_KEY, maxConcurrentTasks); + result.put(SCHEDULING_STRATEGY_KEY, schedulingStrategy); + result.put(SCHEDULING_PERIOD_KEY, schedulingPeriod); + result.put(PENALIZATION_PERIOD_KEY, penalizationPeriod); + result.put(YIELD_PERIOD_KEY, yieldPeriod); + result.put(RUN_DURATION_NANOS_KEY, runDurationNanos); + result.put(AUTO_TERMINATED_RELATIONSHIPS_LIST_KEY, autoTerminatedRelationshipsList); + result.put(PROCESSOR_PROPS_KEY, new TreeMap<>(properties)); + return result; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public String getProcessorClass() { + return processorClass; + } + + public Number getMaxConcurrentTasks() { + return maxConcurrentTasks; + } + + public String getSchedulingStrategy() { + return schedulingStrategy; + } + + public String getSchedulingPeriod() { + return schedulingPeriod; + } + + public String getPenalizationPeriod() { + return penalizationPeriod; + } + + public String getYieldPeriod() { + return yieldPeriod; + } + + public Number getRunDurationNanos() { + return runDurationNanos; + } + + public List<String> getAutoTerminatedRelationshipsList() { + return autoTerminatedRelationshipsList; + } + + public Map<String, Object> getProperties() { + return properties; + } +} http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/2d1e43e7/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ProvenanceReportingSchema.java ---------------------------------------------------------------------- diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ProvenanceReportingSchema.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ProvenanceReportingSchema.java new file mode 100644 index 0000000..b12adb7 --- /dev/null +++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ProvenanceReportingSchema.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.minifi.commons.schema; + +import org.apache.nifi.minifi.commons.schema.common.BaseSchema; +import org.apache.nifi.scheduling.SchedulingStrategy; + +import java.util.Map; + +import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.COMMENT_KEY; +import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.PROVENANCE_REPORTING_KEY; +import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.SCHEDULING_PERIOD_KEY; +import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.SCHEDULING_STRATEGY_KEY; +import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.USE_COMPRESSION_KEY; +import static org.apache.nifi.minifi.commons.schema.RemoteProcessingGroupSchema.TIMEOUT_KEY; + +/** + * + */ +public class ProvenanceReportingSchema extends BaseSchema { + public static final String DESTINATION_URL_KEY = "destination url"; + public static final String PORT_NAME_KEY = "port name"; + public static final String ORIGINATING_URL_KEY = "originating url"; + public static final String BATCH_SIZE_KEY = "batch size"; + + public static final String DEFAULT_ORGINATING_URL = "http://${hostname(true)}:8080/nifi"; + public static final String DEFAULT_TIMEOUT = "30 secs"; + public static final int DEFAULT_BATCH_SIZE = 1000; + public static final boolean DEFAULT_USE_COMPRESSION = true; + + private String schedulingStrategy; + private String schedulingPeriod; + private String destinationUrl; + private String portName; + + private String comment; + private String originatingUrl = DEFAULT_ORGINATING_URL; + private Boolean useCompression = DEFAULT_USE_COMPRESSION; + private String timeout = DEFAULT_TIMEOUT; + private Number batchSize = DEFAULT_BATCH_SIZE; + + public ProvenanceReportingSchema(Map map) { + schedulingStrategy = getRequiredKeyAsType(map, SCHEDULING_STRATEGY_KEY, String.class, PROVENANCE_REPORTING_KEY); + if (schedulingStrategy != null) { + try { + SchedulingStrategy.valueOf(schedulingStrategy); + } catch (IllegalArgumentException e) { + addValidationIssue(SCHEDULING_STRATEGY_KEY, PROVENANCE_REPORTING_KEY, "it is not a valid scheduling strategy"); + } + } + + schedulingPeriod = getRequiredKeyAsType(map, SCHEDULING_PERIOD_KEY, String.class, PROVENANCE_REPORTING_KEY); + destinationUrl = getRequiredKeyAsType(map, DESTINATION_URL_KEY, String.class, PROVENANCE_REPORTING_KEY); + portName = getRequiredKeyAsType(map, PORT_NAME_KEY, String.class, PROVENANCE_REPORTING_KEY); + + comment = getOptionalKeyAsType(map, COMMENT_KEY, String.class, PROVENANCE_REPORTING_KEY, ""); + originatingUrl = getOptionalKeyAsType(map, ORIGINATING_URL_KEY, String.class, PROVENANCE_REPORTING_KEY, DEFAULT_ORGINATING_URL); + useCompression = getOptionalKeyAsType(map, USE_COMPRESSION_KEY, Boolean.class, PROVENANCE_REPORTING_KEY, DEFAULT_USE_COMPRESSION); + timeout = getOptionalKeyAsType(map, TIMEOUT_KEY, String.class, PROVENANCE_REPORTING_KEY, DEFAULT_TIMEOUT); + batchSize = getOptionalKeyAsType(map, BATCH_SIZE_KEY, Number.class, PROVENANCE_REPORTING_KEY, DEFAULT_BATCH_SIZE); + } + + @Override + public Map<String, Object> toMap() { + Map<String, Object> result = mapSupplier.get(); + result.put(COMMENT_KEY, comment); + result.put(SCHEDULING_STRATEGY_KEY, schedulingStrategy); + result.put(SCHEDULING_PERIOD_KEY, schedulingPeriod); + result.put(DESTINATION_URL_KEY, destinationUrl); + result.put(PORT_NAME_KEY, portName); + result.put(ORIGINATING_URL_KEY, originatingUrl); + result.put(USE_COMPRESSION_KEY, useCompression); + result.put(TIMEOUT_KEY, timeout); + result.put(BATCH_SIZE_KEY, batchSize); + return result; + } + + public String getComment() { + return comment; + } + + public String getSchedulingStrategy() { + return schedulingStrategy; + } + + public String getSchedulingPeriod() { + return schedulingPeriod; + } + + public String getDestinationUrl() { + return destinationUrl; + } + + public String getPortName() { + return portName; + } + + public String getOriginatingUrl() { + return originatingUrl; + } + + public boolean getUseCompression() { + return useCompression; + } + + public String getTimeout() { + return timeout; + } + + public Number getBatchSize() { + return batchSize; + } +} http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/2d1e43e7/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ProvenanceRepositorySchema.java ---------------------------------------------------------------------- diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ProvenanceRepositorySchema.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ProvenanceRepositorySchema.java new file mode 100644 index 0000000..ac858ef --- /dev/null +++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ProvenanceRepositorySchema.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.minifi.commons.schema; + +import org.apache.nifi.minifi.commons.schema.common.BaseSchema; + +import java.util.Map; + +import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.PROVENANCE_REPO_KEY; + +public class ProvenanceRepositorySchema extends BaseSchema { + public static final String PROVENANCE_REPO_ROLLOVER_TIME_KEY = "provenance rollover time"; + + public static final String DEFAULT_PROVENANCE_ROLLOVER_TIME = "1 min"; + + private String provenanceRepoRolloverTime = DEFAULT_PROVENANCE_ROLLOVER_TIME; + + public ProvenanceRepositorySchema(){ + } + + public ProvenanceRepositorySchema(Map map) { + provenanceRepoRolloverTime = getOptionalKeyAsType(map, PROVENANCE_REPO_ROLLOVER_TIME_KEY, String.class, + PROVENANCE_REPO_KEY, DEFAULT_PROVENANCE_ROLLOVER_TIME); + } + + @Override + public Map<String, Object> toMap() { + Map<String, Object> result = mapSupplier.get(); + result.put(PROVENANCE_REPO_ROLLOVER_TIME_KEY, provenanceRepoRolloverTime); + return result; + } + + public String getProvenanceRepoRolloverTimeKey() { + return provenanceRepoRolloverTime; + } +} http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/2d1e43e7/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/RemoteInputPortSchema.java ---------------------------------------------------------------------- diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/RemoteInputPortSchema.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/RemoteInputPortSchema.java new file mode 100644 index 0000000..f12f443 --- /dev/null +++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/RemoteInputPortSchema.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.minifi.commons.schema; + +import org.apache.nifi.minifi.commons.schema.common.BaseSchema; + +import java.util.Map; + +import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.COMMENT_KEY; +import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.ID_KEY; +import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.INPUT_PORTS_KEY; +import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.MAX_CONCURRENT_TASKS_KEY; +import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.NAME_KEY; +import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.USE_COMPRESSION_KEY; + +/** + * + */ +public class RemoteInputPortSchema extends BaseSchema { + public static final String DEFAULT_COMMENT = ""; + public static final int DEFAULT_MAX_CONCURRENT_TASKS = 1; + public static final boolean DEFAULT_USE_COMPRESSION = true; + + private String id; + private String name; + + private String comment = DEFAULT_COMMENT; + private Number maxConcurrentTasks = DEFAULT_MAX_CONCURRENT_TASKS; + private Boolean useCompression = DEFAULT_USE_COMPRESSION; + + public RemoteInputPortSchema(Map map) { + id = getRequiredKeyAsType(map, ID_KEY, String.class, INPUT_PORTS_KEY); + name = getRequiredKeyAsType(map, NAME_KEY, String.class, INPUT_PORTS_KEY); + + comment = getOptionalKeyAsType(map, COMMENT_KEY, String.class, INPUT_PORTS_KEY, DEFAULT_COMMENT); + maxConcurrentTasks = getOptionalKeyAsType(map, MAX_CONCURRENT_TASKS_KEY, Number.class, INPUT_PORTS_KEY, DEFAULT_MAX_CONCURRENT_TASKS); + useCompression = getOptionalKeyAsType(map, USE_COMPRESSION_KEY, Boolean.class, INPUT_PORTS_KEY, DEFAULT_USE_COMPRESSION); + } + + @Override + public Map<String, Object> toMap() { + Map<String, Object> result = mapSupplier.get(); + result.put(ID_KEY, id); + result.put(NAME_KEY, name); + result.put(COMMENT_KEY, comment); + result.put(MAX_CONCURRENT_TASKS_KEY, maxConcurrentTasks); + result.put(USE_COMPRESSION_KEY, useCompression); + return result; + } + + public String getId() { + return id; + } + + public String getName() { + return name; + } + + public String getComment() { + return comment; + } + + public Number getMax_concurrent_tasks() { + return maxConcurrentTasks; + } + + public boolean getUseCompression() { + return useCompression; + } +} http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/2d1e43e7/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/RemoteProcessingGroupSchema.java ---------------------------------------------------------------------- diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/RemoteProcessingGroupSchema.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/RemoteProcessingGroupSchema.java new file mode 100644 index 0000000..3fb351e --- /dev/null +++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/RemoteProcessingGroupSchema.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.minifi.commons.schema; + +import org.apache.nifi.minifi.commons.schema.common.BaseSchema; + +import java.util.List; +import java.util.Map; + +import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.COMMENT_KEY; +import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.INPUT_PORTS_KEY; +import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.NAME_KEY; +import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.REMOTE_PROCESSING_GROUPS_KEY; +import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.YIELD_PERIOD_KEY; + +/** + * + */ +public class RemoteProcessingGroupSchema extends BaseSchema { + public static final String URL_KEY = "url"; + public static final String TIMEOUT_KEY = "timeout"; + + public static final String DEFAULT_COMMENT = ""; + public static final String DEFAULT_TIMEOUT = "30 secs"; + public static final String DEFAULT_YIELD_PERIOD = "10 sec"; + + private String name; + private String url; + private List<RemoteInputPortSchema> inputPorts; + + private String comment = DEFAULT_COMMENT; + private String timeout = DEFAULT_TIMEOUT; + private String yieldPeriod = DEFAULT_YIELD_PERIOD; + + public RemoteProcessingGroupSchema(Map map) { + name = getRequiredKeyAsType(map, NAME_KEY, String.class, REMOTE_PROCESSING_GROUPS_KEY); + url = getRequiredKeyAsType(map, URL_KEY, String.class, REMOTE_PROCESSING_GROUPS_KEY); + inputPorts = getRequiredKeyAsType(map, INPUT_PORTS_KEY, List.class, REMOTE_PROCESSING_GROUPS_KEY); + if (inputPorts != null) { + transformListToType(inputPorts, "input port", RemoteInputPortSchema.class, INPUT_PORTS_KEY); + + for (RemoteInputPortSchema remoteInputPortSchema: inputPorts) { + addIssuesIfNotNull(remoteInputPortSchema); + } + } + + comment = getOptionalKeyAsType(map, COMMENT_KEY, String.class, REMOTE_PROCESSING_GROUPS_KEY, DEFAULT_COMMENT); + timeout = getOptionalKeyAsType(map, TIMEOUT_KEY, String.class, REMOTE_PROCESSING_GROUPS_KEY, DEFAULT_TIMEOUT); + yieldPeriod = getOptionalKeyAsType(map, YIELD_PERIOD_KEY, String.class, REMOTE_PROCESSING_GROUPS_KEY, DEFAULT_YIELD_PERIOD); + } + + @Override + public Map<String, Object> toMap() { + Map<String, Object> result = mapSupplier.get(); + result.put(NAME_KEY, name); + result.put(URL_KEY, url); + result.put(COMMENT_KEY, comment); + result.put(TIMEOUT_KEY, timeout); + result.put(YIELD_PERIOD_KEY, yieldPeriod); + putListIfNotNull(result, INPUT_PORTS_KEY, inputPorts); + return result; + } + + public String getName() { + return name; + } + + public String getComment() { + return comment; + } + + public String getUrl() { + return url; + } + + public String getTimeout() { + return timeout; + } + + public String getYieldPeriod() { + return yieldPeriod; + } + + public List<RemoteInputPortSchema> getInputPorts() { + return inputPorts; + } +} http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/2d1e43e7/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/SecurityPropertiesSchema.java ---------------------------------------------------------------------- diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/SecurityPropertiesSchema.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/SecurityPropertiesSchema.java new file mode 100644 index 0000000..3f0c6c1 --- /dev/null +++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/SecurityPropertiesSchema.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.minifi.commons.schema; + +import org.apache.nifi.minifi.commons.schema.common.BaseSchema; + +import java.util.Map; + +import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.SECURITY_PROPS_KEY; +import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.SENSITIVE_PROPS_KEY; + +/** + * + */ +public class SecurityPropertiesSchema extends BaseSchema { + + public static final String KEYSTORE_KEY = "keystore"; + public static final String KEYSTORE_TYPE_KEY = "keystore type"; + public static final String KEYSTORE_PASSWORD_KEY = "keystore password"; + public static final String KEY_PASSWORD_KEY = "key password"; + public static final String TRUSTSTORE_KEY = "truststore"; + public static final String TRUSTSTORE_TYPE_KEY = "truststore type"; + public static final String TRUSTSTORE_PASSWORD_KEY = "truststore password"; + public static final String SSL_PROTOCOL_KEY = "ssl protocol"; + + private String keystore = ""; + private String keystoreType = ""; + private String keystorePassword = ""; + private String keyPassword = ""; + private String truststore = ""; + private String truststoreType = ""; + private String truststorePassword = ""; + private String sslProtocol = ""; + private SensitivePropsSchema sensitiveProps; + + public SecurityPropertiesSchema() { + sensitiveProps = new SensitivePropsSchema(); + } + + public SecurityPropertiesSchema(Map map) { + keystore = getOptionalKeyAsType(map, KEYSTORE_KEY, String.class, SECURITY_PROPS_KEY, ""); + + keystoreType = getOptionalKeyAsType(map, KEYSTORE_TYPE_KEY, String.class, SECURITY_PROPS_KEY, ""); + if (!isNullOrEmpty(keystoreType)) { + if (validateStoreType(keystoreType)) { + addValidationIssue(KEYSTORE_TYPE_KEY, SECURITY_PROPS_KEY, "it is not a supported type (must be either PKCS12 or JKS format)"); + } + } + + keystorePassword = getOptionalKeyAsType(map, KEYSTORE_PASSWORD_KEY, String.class, SECURITY_PROPS_KEY, ""); + + keyPassword = getOptionalKeyAsType(map, KEY_PASSWORD_KEY, String.class, SECURITY_PROPS_KEY, ""); + + truststore = getOptionalKeyAsType(map, TRUSTSTORE_KEY, String.class, SECURITY_PROPS_KEY, ""); + + truststoreType = getOptionalKeyAsType(map, TRUSTSTORE_TYPE_KEY, String.class, SECURITY_PROPS_KEY, ""); + if (!isNullOrEmpty(truststoreType)) { + if (validateStoreType(truststoreType)) { + addValidationIssue(TRUSTSTORE_TYPE_KEY, SECURITY_PROPS_KEY, "it is not a supported type (must be either PKCS12 or JKS format)"); + } + } + + truststorePassword = getOptionalKeyAsType(map, TRUSTSTORE_PASSWORD_KEY, String.class, SECURITY_PROPS_KEY, ""); + + sslProtocol = getOptionalKeyAsType(map, SSL_PROTOCOL_KEY, String.class, SECURITY_PROPS_KEY, ""); + if (!isNullOrEmpty(sslProtocol)) { + switch (sslProtocol) { + case "SSL": + break; + case "SSLv2Hello": + break; + case "SSLv3": + break; + case "TLS": + break; + case "TLSv1": + break; + case "TLSv1.1": + break; + case "TLSv1.2": + break; + default: + addValidationIssue(SSL_PROTOCOL_KEY, SECURITY_PROPS_KEY, "it is not an allowable value of SSL protocol"); + break; + } + if (isNullOrEmpty(keystore)) { + validationIssues.add("When the '" + SSL_PROTOCOL_KEY + "' key of '" + SECURITY_PROPS_KEY + "' is set, the '" + KEYSTORE_KEY + "' must also be set"); + } else if (isNullOrEmpty(keystoreType) || isNullOrEmpty(keystorePassword) || isNullOrEmpty(keyPassword)) { + validationIssues.add("When the '" + KEYSTORE_KEY + "' key of '" + SECURITY_PROPS_KEY + "' is set, the '" + KEYSTORE_TYPE_KEY + "', '" + KEYSTORE_PASSWORD_KEY + + "' and '" + KEY_PASSWORD_KEY + "' all must also be set"); + } + + if (!isNullOrEmpty(truststore) && (isNullOrEmpty(truststoreType) || isNullOrEmpty(truststorePassword))) { + validationIssues.add("When the '" + TRUSTSTORE_KEY + "' key of '" + SECURITY_PROPS_KEY + "' is set, the '" + TRUSTSTORE_TYPE_KEY + "' and '" + + TRUSTSTORE_PASSWORD_KEY + "' must also be set"); + } + } + + sensitiveProps = getMapAsType(map, SENSITIVE_PROPS_KEY, SensitivePropsSchema.class, SECURITY_PROPS_KEY, false); + + addIssuesIfNotNull(sensitiveProps); + } + + @Override + public Map<String, Object> toMap() { + Map<String, Object> result = mapSupplier.get(); + result.put(KEYSTORE_KEY, keystore); + result.put(KEYSTORE_TYPE_KEY, keystoreType); + result.put(KEYSTORE_PASSWORD_KEY, keystorePassword); + result.put(KEY_PASSWORD_KEY, keyPassword); + result.put(TRUSTSTORE_KEY, truststore); + result.put(TRUSTSTORE_TYPE_KEY, truststoreType); + result.put(TRUSTSTORE_PASSWORD_KEY, truststorePassword); + result.put(SSL_PROTOCOL_KEY, sslProtocol); + putIfNotNull(result, SENSITIVE_PROPS_KEY, sensitiveProps); + return result; + } + + private boolean validateStoreType(String store) { + return !store.isEmpty() && !(store.equalsIgnoreCase("JKS") || store.equalsIgnoreCase("PKCS12")); + } + + public boolean useSSL() { + return !isNullOrEmpty(sslProtocol); + } + + public String getKeystore() { + return keystore; + } + + public String getKeystoreType() { + return keystoreType; + } + + public String getKeystorePassword() { + return keystorePassword; + } + + public String getKeyPassword() { + return keyPassword; + } + + public String getTruststore() { + return truststore; + } + + public String getTruststoreType() { + return truststoreType; + } + + public String getTruststorePassword() { + return truststorePassword; + } + + public String getSslProtocol() { + return sslProtocol; + } + + public SensitivePropsSchema getSensitiveProps() { + return sensitiveProps; + } +} http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/2d1e43e7/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/SensitivePropsSchema.java ---------------------------------------------------------------------- diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/SensitivePropsSchema.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/SensitivePropsSchema.java new file mode 100644 index 0000000..93260ea --- /dev/null +++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/SensitivePropsSchema.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.minifi.commons.schema; + +import org.apache.nifi.minifi.commons.schema.common.BaseSchema; + +import java.util.Map; + +import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.SENSITIVE_PROPS_KEY; + +/** + * + */ +public class SensitivePropsSchema extends BaseSchema { + public static final String SENSITIVE_PROPS_KEY_KEY = "key"; + public static final String SENSITIVE_PROPS_ALGORITHM_KEY = "algorithm"; + public static final String SENSITIVE_PROPS_PROVIDER_KEY = "provider"; + + public static final String DEFAULT_ALGORITHM = "PBEWITHMD5AND256BITAES-CBC-OPENSSL"; + public static final String DEFAULT_PROVIDER = "BC"; + + private String key; + private String algorithm = DEFAULT_ALGORITHM; + private String provider = DEFAULT_PROVIDER; + + public SensitivePropsSchema() { + } + + public SensitivePropsSchema(Map map) { + key = getOptionalKeyAsType(map, SENSITIVE_PROPS_KEY_KEY, String.class, SENSITIVE_PROPS_KEY, ""); + algorithm = getOptionalKeyAsType(map, SENSITIVE_PROPS_ALGORITHM_KEY, String.class, SENSITIVE_PROPS_KEY, DEFAULT_ALGORITHM); + provider = getOptionalKeyAsType(map, SENSITIVE_PROPS_PROVIDER_KEY, String.class, SENSITIVE_PROPS_KEY, DEFAULT_PROVIDER); + } + + @Override + public Map<String, Object> toMap() { + Map<String, Object> result = mapSupplier.get(); + result.put(SENSITIVE_PROPS_KEY_KEY, key); + result.put(SENSITIVE_PROPS_ALGORITHM_KEY, algorithm); + result.put(SENSITIVE_PROPS_PROVIDER_KEY, provider); + return result; + } + + public String getKey() { + return key; + } + + public String getAlgorithm() { + return algorithm; + } + + public String getProvider() { + return provider; + } +} http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/2d1e43e7/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/SwapSchema.java ---------------------------------------------------------------------- diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/SwapSchema.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/SwapSchema.java new file mode 100644 index 0000000..d38ce7a --- /dev/null +++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/SwapSchema.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.minifi.commons.schema; + +import org.apache.nifi.minifi.commons.schema.common.BaseSchema; + +import java.util.Map; + +import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.SWAP_PROPS_KEY; + +/** + * + */ +public class SwapSchema extends BaseSchema { + public static final String THRESHOLD_KEY = "threshold"; + public static final String IN_PERIOD_KEY = "in period"; + public static final String IN_THREADS_KEY = "in threads"; + public static final String OUT_PERIOD_KEY = "out period"; + public static final String OUT_THREADS_KEY = "out threads"; + + public static final int DEFAULT_THRESHOLD = 20000; + public static final String DEFAULT_IN_PERIOD = "5 sec"; + public static final int DEFAULT_IN_THREADS = 1; + public static final String DEFAULT_OUT_PERIOD = "5 sec"; + public static final int DEFAULT_OUT_THREADS = 4; + + private Number threshold = DEFAULT_THRESHOLD; + private String inPeriod = DEFAULT_IN_PERIOD; + private Number inThreads = DEFAULT_IN_THREADS; + private String outPeriod = DEFAULT_OUT_PERIOD; + private Number outThreads = DEFAULT_OUT_THREADS; + + public SwapSchema() { + } + + public SwapSchema(Map map) { + threshold = getOptionalKeyAsType(map, THRESHOLD_KEY, Number.class, SWAP_PROPS_KEY, DEFAULT_THRESHOLD); + inPeriod = getOptionalKeyAsType(map, IN_PERIOD_KEY, String.class, SWAP_PROPS_KEY, DEFAULT_IN_PERIOD); + inThreads = getOptionalKeyAsType(map, IN_THREADS_KEY, Number.class, SWAP_PROPS_KEY, DEFAULT_IN_THREADS); + outPeriod = getOptionalKeyAsType(map, OUT_PERIOD_KEY, String.class, SWAP_PROPS_KEY, DEFAULT_OUT_PERIOD); + outThreads = getOptionalKeyAsType(map, OUT_THREADS_KEY, Number.class, SWAP_PROPS_KEY, DEFAULT_OUT_THREADS); + } + + @Override + public Map<String, Object> toMap() { + Map<String, Object> result = mapSupplier.get(); + result.put(THRESHOLD_KEY, threshold); + result.put(IN_PERIOD_KEY, inPeriod); + result.put(IN_THREADS_KEY, inThreads); + result.put(OUT_PERIOD_KEY, outPeriod); + result.put(OUT_THREADS_KEY, outThreads); + return result; + } + + public Number getThreshold() { + return threshold; + } + + public String getInPeriod() { + return inPeriod; + } + + public Number getInThreads() { + return inThreads; + } + + public String getOutPeriod() { + return outPeriod; + } + + public Number getOutThreads() { + return outThreads; + } +} http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/2d1e43e7/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/BaseSchema.java ---------------------------------------------------------------------- diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/BaseSchema.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/BaseSchema.java new file mode 100644 index 0000000..8c85acb --- /dev/null +++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/BaseSchema.java @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.minifi.commons.schema.common; + +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Predicate; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +public abstract class BaseSchema { + public static final String IT_WAS_NOT_FOUND_AND_IT_IS_REQUIRED = "it was not found and it is required"; + protected final Supplier<Map<String, Object>> mapSupplier; + + public BaseSchema() { + this(LinkedHashMap::new); + } + + public BaseSchema(Supplier<Map<String, Object>> mapSupplier) { + this.mapSupplier = mapSupplier; + } + + /******* Validation Issue helper methods *******/ + public List<String> validationIssues = new LinkedList<>(); + + public boolean isValid() { + return validationIssues.isEmpty(); + } + + public List<String> getValidationIssues() { + return validationIssues; + } + + public String getValidationIssuesAsString() { + StringBuilder stringBuilder = new StringBuilder(); + boolean first = true; + for (String validationIssue : validationIssues) { + if (!first) { + stringBuilder.append(", "); + } + stringBuilder.append("["); + stringBuilder.append(validationIssue); + stringBuilder.append("]"); + first = false; + } + return stringBuilder.toString(); + } + + public <T> T getAndValidateNotNull(Supplier<T> supplier, String keyName, String wrapperName) { + return getAndValidate(supplier, t -> t != null, keyName, wrapperName, IT_WAS_NOT_FOUND_AND_IT_IS_REQUIRED); + } + + public <T> T getAndValidate(Supplier<T> supplier, Predicate<T> predicate, String keyName, String wrapperName, String reason) { + T result = supplier.get(); + if (!predicate.test(result)) { + addValidationIssue(keyName, wrapperName, reason); + } + return result; + } + + public void addValidationIssue(String keyName, String wrapperName, String reason) { + validationIssues.add("'" + keyName + "' in section '" + wrapperName + "' because " + reason); + } + + public void addIssuesIfNotNull(BaseSchema baseSchema) { + if (baseSchema != null) { + validationIssues.addAll(baseSchema.getValidationIssues()); + } + } + + /******* Value Access/Interpretation helper methods *******/ + public <T> T getOptionalKeyAsType(Map valueMap, String key, Class<T> targetClass, String wrapperName, T defaultValue) { + return getKeyAsType(valueMap, key, targetClass, wrapperName, false, defaultValue); + } + + public <T> T getRequiredKeyAsType(Map valueMap, String key, Class<T> targetClass, String wrapperName) { + return getKeyAsType(valueMap, key, targetClass, wrapperName, true, null); + } + + <T> T getKeyAsType(Map valueMap, String key, Class<T> targetClass, String wrapperName, boolean required, T defaultValue) { + Object value = valueMap.get(key); + if (value == null) { + if (defaultValue != null) { + return defaultValue; + } else if(required) { + addValidationIssue(key, wrapperName, IT_WAS_NOT_FOUND_AND_IT_IS_REQUIRED); + } + } else { + if (targetClass.isInstance(value)) { + return (T) value; + } else { + addValidationIssue(key, wrapperName, "it is found but could not be parsed as a " + targetClass.getSimpleName()); + } + } + return null; + } + + + public <T> T getMapAsType(Map valueMap, String key, Class<T> targetClass, String wrapperName, boolean required) { + Object obj = valueMap.get(key); + return interpretValueAsType(obj, key, targetClass, wrapperName, required, true); + } + + public <T> T getMapAsType(Map valueMap, String key, Class targetClass, String wrapperName, boolean required, boolean instantiateIfNull) { + Object obj = valueMap.get(key); + return interpretValueAsType(obj, key, targetClass, wrapperName, required, instantiateIfNull); + } + + public <T> void transformListToType(List<T> list, String simpleListType, Class<T> targetClass, String wrapperName){ + for (int i = 0; i < list.size(); i++) { + T obj = interpretValueAsType(list.get(i), simpleListType + " number " + i, targetClass, wrapperName, false, false); + if (obj != null) { + list.set(i, obj); + } + } + } + + private <T> T interpretValueAsType(Object obj, String key, Class targetClass, String wrapperName, boolean required, boolean instantiateIfNull) { + if (obj == null) { + if (required){ + addValidationIssue(key, wrapperName, "it is a required property but was not found"); + } else { + if(instantiateIfNull) { + try { + return (T) targetClass.newInstance(); + } catch (InstantiationException | IllegalAccessException e) { + addValidationIssue(key, wrapperName, "no value was given, and it is supposed to be created with default values as a default, and when attempting to create it the following " + + "exception was thrown:" + e.getMessage()); + } + } + } + } else if (obj instanceof Map) { + Constructor<?> constructor; + try { + constructor = targetClass.getConstructor(Map.class); + return (T) constructor.newInstance((Map) obj); + } catch (NoSuchMethodException | IllegalAccessException | InstantiationException | InvocationTargetException e) { + addValidationIssue(key, wrapperName, "it is found as a map and when attempting to interpret it the following exception was thrown:" + e.getMessage()); + } + } else { + try { + return (T) obj; + } catch (ClassCastException e) { + addValidationIssue(key, wrapperName, "it is found but could not be parsed as a map"); + } + } + return null; + } + + public abstract Map<String, Object> toMap(); + + public static void putIfNotNull(Map valueMap, String key, BaseSchema schema) { + if (schema != null) { + valueMap.put(key, schema.toMap()); + } + } + + public static void putListIfNotNull(Map valueMap, String key, List<? extends BaseSchema> list) { + if (list != null) { + valueMap.put(key, list.stream().map(BaseSchema::toMap).collect(Collectors.toList())); + } + } + + public static <T> List<T> nullToEmpty(List<T> list) { + return list == null ? Collections.emptyList() : list; + } + + public static <T> Set<T> nullToEmpty(Set<T> set) { + return set == null ? Collections.emptySet() : set; + } + + public static <K, V> Map<K, V> nullToEmpty(Map<K, V> map) { + return map == null ? Collections.emptyMap() : map; + } + + public static boolean isNullOrEmpty(final String string) { + return string == null || string.isEmpty(); + } +}