http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/misc/nar-maven-plugin/src/main/resources/META-INF/plexus/components.xml ---------------------------------------------------------------------- diff --git a/misc/nar-maven-plugin/src/main/resources/META-INF/plexus/components.xml b/misc/nar-maven-plugin/src/main/resources/META-INF/plexus/components.xml deleted file mode 100644 index 0680d18..0000000 --- a/misc/nar-maven-plugin/src/main/resources/META-INF/plexus/components.xml +++ /dev/null @@ -1,52 +0,0 @@ -<?xml version="1.0"?> -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> -<component-set> - <components> - <component> - <role>org.apache.maven.lifecycle.mapping.LifecycleMapping</role> - <role-hint>nar</role-hint> - <implementation>org.apache.maven.lifecycle.mapping.DefaultLifecycleMapping</implementation> - <configuration> - <lifecycles> - <lifecycle> - <id>default</id> - <phases> - <process-resources>org.apache.maven.plugins:maven-resources-plugin:resources</process-resources> - <compile>org.apache.maven.plugins:maven-compiler-plugin:compile</compile> - <process-test-resources>org.apache.maven.plugins:maven-resources-plugin:testResources</process-test-resources> - <test-compile>org.apache.maven.plugins:maven-compiler-plugin:testCompile</test-compile> - <test>org.apache.maven.plugins:maven-surefire-plugin:test</test> - <package>org.apache.nifi:nar-maven-plugin:nar</package> - <install>org.apache.maven.plugins:maven-install-plugin:install</install> - <deploy>org.apache.maven.plugins:maven-deploy-plugin:deploy</deploy> - </phases> - </lifecycle> - </lifecycles> - </configuration> - </component> - <component> - <role>org.apache.maven.artifact.handler.ArtifactHandler</role> - <role-hint>nar</role-hint> - <implementation>org.apache.maven.artifact.handler.DefaultArtifactHandler</implementation> - <configuration> - <type>nar</type> - <language>java</language> - <addedToClasspath>false</addedToClasspath> - <includesDependencies>true</includesDependencies> - </configuration> - </component> - </components> -</component-set>
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/misc/pom.xml ---------------------------------------------------------------------- diff --git a/misc/pom.xml b/misc/pom.xml new file mode 100644 index 0000000..5c7ca7f --- /dev/null +++ b/misc/pom.xml @@ -0,0 +1,100 @@ +<?xml version="1.0"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <groupId>org.apache.nifi</groupId> + <artifactId>nar-maven-plugin</artifactId> + <version>0.0.1-SNAPSHOT</version> + <packaging>maven-plugin</packaging> + <name>Apache NiFi NAR Plugin</name> + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + </properties> + <build> + <defaultGoal>install</defaultGoal> + <plugins> + <plugin> + <artifactId>maven-compiler-plugin</artifactId> + <version>3.2</version> + <configuration> + <source>1.7</source> + <target>1.7</target> + </configuration> + </plugin> + <plugin> + <artifactId>maven-jar-plugin</artifactId> + <version>2.5</version> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-plugin-plugin</artifactId> + <version>3.3</version> + <executions> + <execution> + <id>default-descriptor</id> + <goals> + <goal>descriptor</goal> + </goals> + <phase>process-classes</phase> + </execution> + <execution> + <id>help-descriptor</id> + <goals> + <goal>helpmojo</goal> + </goals> + <phase>process-classes</phase> + </execution> + </executions> + </plugin> + </plugins> + </build> + <dependencies> + <dependency> + <groupId>org.apache.maven</groupId> + <artifactId>maven-plugin-api</artifactId> + <version>2.0.11</version> + </dependency> + <dependency> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-dependency-plugin</artifactId> + <version>2.9</version> + <type>maven-plugin</type> + </dependency> + <dependency> + <!-- No code from maven-jar-plugin is actually used; it's included + just to simplify the dependencies list. --> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <version>2.5</version> + </dependency> + <dependency> + <groupId>org.apache.maven.plugin-tools</groupId> + <artifactId>maven-plugin-annotations</artifactId> + <version>3.3</version> + <scope>provided</scope> + </dependency> + </dependencies> + <distributionManagement> + <repository> + <id>nifi-releases</id> + <url>${nifi.repo.url}</url> + </repository> + <snapshotRepository> + <id>nifi-snapshots</id> + <url>${nifi.snapshot.repo.url}</url> + </snapshotRepository> + </distributionManagement> +</project> http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/misc/src/main/java/nifi/NarMojo.java ---------------------------------------------------------------------- diff --git a/misc/src/main/java/nifi/NarMojo.java b/misc/src/main/java/nifi/NarMojo.java new file mode 100644 index 0000000..5196f73 --- /dev/null +++ b/misc/src/main/java/nifi/NarMojo.java @@ -0,0 +1,610 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package nifi; + +import java.io.File; +import java.io.IOException; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import org.apache.maven.archiver.MavenArchiveConfiguration; +import org.apache.maven.archiver.MavenArchiver; +import org.apache.maven.artifact.Artifact; +import org.apache.maven.artifact.DependencyResolutionRequiredException; +import org.apache.maven.artifact.factory.ArtifactFactory; +import org.apache.maven.artifact.installer.ArtifactInstaller; +import org.apache.maven.artifact.metadata.ArtifactMetadataSource; +import org.apache.maven.artifact.repository.ArtifactRepository; +import org.apache.maven.artifact.repository.ArtifactRepositoryFactory; +import org.apache.maven.artifact.resolver.ArtifactCollector; +import org.apache.maven.artifact.resolver.ArtifactNotFoundException; +import org.apache.maven.artifact.resolver.ArtifactResolutionException; +import org.apache.maven.artifact.resolver.ArtifactResolver; +import org.apache.maven.plugin.AbstractMojo; +import org.apache.maven.plugin.MojoExecutionException; +import org.apache.maven.plugin.MojoFailureException; +import org.apache.maven.plugin.dependency.utils.DependencyStatusSets; +import org.apache.maven.plugin.dependency.utils.DependencyUtil; +import org.apache.maven.plugin.dependency.utils.filters.DestFileFilter; +import org.apache.maven.plugin.dependency.utils.resolvers.ArtifactsResolver; +import org.apache.maven.plugin.dependency.utils.resolvers.DefaultArtifactsResolver; +import org.apache.maven.plugin.dependency.utils.translators.ArtifactTranslator; +import org.apache.maven.plugin.dependency.utils.translators.ClassifierTypeTranslator; +import org.apache.maven.plugins.annotations.LifecyclePhase; +import org.apache.maven.plugins.annotations.Mojo; +import org.apache.maven.plugins.annotations.Parameter; +import org.apache.maven.plugins.annotations.ResolutionScope; +import org.apache.maven.project.MavenProject; +import org.apache.maven.execution.MavenSession; +import org.apache.maven.plugins.annotations.Component; +import org.apache.maven.project.MavenProjectHelper; +import org.apache.maven.shared.artifact.filter.collection.ArtifactFilterException; +import org.apache.maven.shared.artifact.filter.collection.ArtifactIdFilter; +import org.apache.maven.shared.artifact.filter.collection.ArtifactsFilter; +import org.apache.maven.shared.artifact.filter.collection.ClassifierFilter; +import org.apache.maven.shared.artifact.filter.collection.FilterArtifacts; +import org.apache.maven.shared.artifact.filter.collection.GroupIdFilter; +import org.apache.maven.shared.artifact.filter.collection.ScopeFilter; +import org.apache.maven.shared.artifact.filter.collection.ProjectTransitivityFilter; +import org.apache.maven.shared.artifact.filter.collection.TypeFilter; +import org.codehaus.plexus.archiver.ArchiverException; +import org.codehaus.plexus.archiver.jar.JarArchiver; +import org.codehaus.plexus.archiver.jar.ManifestException; +import org.codehaus.plexus.archiver.manager.ArchiverManager; +import org.codehaus.plexus.util.FileUtils; +import org.codehaus.plexus.util.StringUtils; + +/** + * Packages the current project as an Apache NiFi Archive (NAR). + * + * The following code is derived from maven-dependencies-plugin and + * maven-jar-plugin. The functionality of CopyDependenciesMojo and JarMojo was + * simplified to the use case of NarMojo. + * + */ +@Mojo(name = "nar", defaultPhase = LifecyclePhase.PACKAGE, threadSafe = false, requiresDependencyResolution = ResolutionScope.RUNTIME) +public class NarMojo extends AbstractMojo { + + private static final String[] DEFAULT_EXCLUDES = new String[]{"**/package.html"}; + private static final String[] DEFAULT_INCLUDES = new String[]{"**/**"}; + + /** + * POM + * + */ + @Parameter(defaultValue = "${project}", readonly = true, required = true) + protected MavenProject project; + + @Parameter(defaultValue = "${session}", readonly = true, required = true) + protected MavenSession session; + + /** + * List of files to include. Specified as fileset patterns. + */ + @Parameter(property = "includes") + protected String[] includes; + /** + * List of files to exclude. Specified as fileset patterns. + */ + @Parameter(property = "excludes") + protected String[] excludes; + /** + * Name of the generated NAR. + * + */ + @Parameter(alias = "narName", property = "nar.finalName", defaultValue = "${project.build.finalName}", required = true) + protected String finalName; + + /** + * The Jar archiver. + * + * \@\component role="org.codehaus.plexus.archiver.Archiver" roleHint="jar" + */ + @Component(role = org.codehaus.plexus.archiver.Archiver.class, hint = "jar") + private JarArchiver jarArchiver; + /** + * The archive configuration to use. + * + * See <a + * href="http://maven.apache.org/shared/maven-archiver/index.html">the + * documentation for Maven Archiver</a>. + * + */ + @Parameter(property = "archive") + protected final MavenArchiveConfiguration archive = new MavenArchiveConfiguration(); + /** + * Path to the default MANIFEST file to use. It will be used if + * <code>useDefaultManifestFile</code> is set to <code>true</code>. + * + */ + @Parameter(property = "defaultManifestFiles", defaultValue = "${project.build.outputDirectory}/META-INF/MANIFEST.MF", readonly = true, required = true) + protected File defaultManifestFile; + + /** + * Set this to <code>true</code> to enable the use of the + * <code>defaultManifestFile</code>. + * + * @since 2.2 + */ + @Parameter(property = "nar.useDefaultManifestFile", defaultValue = "false") + protected boolean useDefaultManifestFile; + + @Component + protected MavenProjectHelper projectHelper; + + /** + * Whether creating the archive should be forced. + * + */ + @Parameter(property = "nar.forceCreation", defaultValue = "false") + protected boolean forceCreation; + + /** + * Classifier to add to the artifact generated. If given, the artifact will + * be an attachment instead. + * + */ + @Parameter(property = "classifier") + protected String classifier; + + @Component + protected ArtifactInstaller installer; + + @Component + protected ArtifactRepositoryFactory repositoryFactory; + + /** + * This only applies if the classifier parameter is used. + * + */ + @Parameter(property = "mdep.failOnMissingClassifierArtifact", defaultValue = "true", required = false) + protected boolean failOnMissingClassifierArtifact = true; + + /** + * Comma Separated list of Types to include. Empty String indicates include + * everything (default). + * + */ + @Parameter(property = "includeTypes", required = false) + protected String includeTypes; + + /** + * Comma Separated list of Types to exclude. Empty String indicates don't + * exclude anything (default). + * + */ + @Parameter(property = "excludeTypes", required = false) + protected String excludeTypes; + + /** + * Scope to include. An Empty string indicates all scopes (default). + * + */ + @Parameter(property = "includeScope", required = false) + protected String includeScope; + + /** + * Scope to exclude. An Empty string indicates no scopes (default). + * + */ + @Parameter(property = "excludeScope", required = false) + protected String excludeScope; + + /** + * Comma Separated list of Classifiers to include. Empty String indicates + * include everything (default). + * + */ + @Parameter(property = "includeClassifiers", required = false) + protected String includeClassifiers; + + /** + * Comma Separated list of Classifiers to exclude. Empty String indicates + * don't exclude anything (default). + * + */ + @Parameter(property = "excludeClassifiers", required = false) + protected String excludeClassifiers; + + /** + * Specify classifier to look for. Example: sources + * + */ + @Parameter(property = "classifier", required = false) + protected String copyDepClassifier; + + /** + * Specify type to look for when constructing artifact based on classifier. + * Example: java-source,jar,war, nar + * + */ + @Parameter(property = "type", required = false, defaultValue = "nar") + protected String type; + + /** + * Comma separated list of Artifact names too exclude. + * + */ + @Parameter(property = "excludeArtifacts", required = false) + protected String excludeArtifactIds; + + /** + * Comma separated list of Artifact names to include. + * + */ + @Parameter(property = "includeArtifacts", required = false) + protected String includeArtifactIds; + + /** + * Comma separated list of GroupId Names to exclude. + * + */ + @Parameter(property = "excludeArtifacts", required = false) + protected String excludeGroupIds; + + /** + * Comma separated list of GroupIds to include. + * + */ + @Parameter(property = "includeGroupIds", required = false) + protected String includeGroupIds; + + /** + * Directory to store flag files + * + */ + @Parameter(property = "markersDirectory", required = false, defaultValue = "${project.build.directory}/dependency-maven-plugin-markers") + protected File markersDirectory; + + /** + * Overwrite release artifacts + * + */ + @Parameter(property = "overWriteReleases", required = false) + protected boolean overWriteReleases; + + /** + * Overwrite snapshot artifacts + * + */ + @Parameter(property = "overWriteSnapshots", required = false) + protected boolean overWriteSnapshots; + + /** + * Overwrite artifacts that don't exist or are older than the source. + * + */ + @Parameter(property = "overWriteIfNewer", required = false, defaultValue = "true") + protected boolean overWriteIfNewer; + + /** + * Used to look up Artifacts in the remote repository. + */ + @Component + protected ArtifactFactory factory; + + /** + * Used to look up Artifacts in the remote repository. + * + */ + @Component + protected ArtifactResolver resolver; + + /** + * Artifact collector, needed to resolve dependencies. + * + */ + @Component(role = org.apache.maven.artifact.resolver.ArtifactCollector.class) + protected ArtifactCollector artifactCollector; + + @Component(role = org.apache.maven.artifact.metadata.ArtifactMetadataSource.class) + protected ArtifactMetadataSource artifactMetadataSource; + + /** + * Location of the local repository. + * + */ + @Parameter(property = "localRepository", required = true, readonly = true) + protected ArtifactRepository local; + + /** + * List of Remote Repositories used by the resolver + * + */ + @Parameter(property = "project.remoteArtifactRepositories", required = true, readonly = true) + protected List remoteRepos; + + /** + * To look up Archiver/UnArchiver implementations + * + */ + @Component + protected ArchiverManager archiverManager; + + /** + * Contains the full list of projects in the reactor. + * + */ + @Parameter(property = "reactorProjects", required = true, readonly = true) + protected List reactorProjects; + + /** + * If the plugin should be silent. + * + */ + @Parameter(property = "silent", required = false, defaultValue = "false") + public boolean silent; + + /** + * Output absolute filename for resolved artifacts + * + */ + @Parameter(property = "outputAbsoluteArtifactFilename", defaultValue = "false", required = false) + protected boolean outputAbsoluteArtifactFilename; + + @Override + public void execute() throws MojoExecutionException, MojoFailureException { + copyDependencies(); + makeNar(); + } + + private void copyDependencies() throws MojoExecutionException { + DependencyStatusSets dss = getDependencySets(this.failOnMissingClassifierArtifact); + Set artifacts = dss.getResolvedDependencies(); + + for (Object artifactObj : artifacts) { + copyArtifact((Artifact) artifactObj); + } + + artifacts = dss.getSkippedDependencies(); + for (Object artifactOjb : artifacts) { + Artifact artifact = (Artifact) artifactOjb; + getLog().info(artifact.getFile().getName() + " already exists in destination."); + } + } + + protected void copyArtifact(Artifact artifact) throws MojoExecutionException { + String destFileName = DependencyUtil.getFormattedFileName(artifact, false); + final File destDir = DependencyUtil.getFormattedOutputDirectory(false, false, false, false, false, getDependenciesDirectory(), artifact); + final File destFile = new File(destDir, destFileName); + copyFile(artifact.getFile(), destFile); + } + + protected Artifact getResolvedPomArtifact(Artifact artifact) { + Artifact pomArtifact = this.factory.createArtifact(artifact.getGroupId(), artifact.getArtifactId(), artifact.getVersion(), "", "pom"); + // Resolve the pom artifact using repos + try { + this.resolver.resolve(pomArtifact, this.remoteRepos, this.local); + } catch (ArtifactResolutionException | ArtifactNotFoundException e) { + getLog().info(e.getMessage()); + } + return pomArtifact; + } + + protected ArtifactsFilter getMarkedArtifactFilter() { + return new DestFileFilter(this.overWriteReleases, this.overWriteSnapshots, this.overWriteIfNewer, false, false, false, false, false, getDependenciesDirectory()); + } + + protected DependencyStatusSets getDependencySets(boolean stopOnFailure) throws MojoExecutionException { + // add filters in well known order, least specific to most specific + FilterArtifacts filter = new FilterArtifacts(); + + filter.addFilter(new ProjectTransitivityFilter(project.getDependencyArtifacts(), false)); + filter.addFilter(new ScopeFilter(this.includeScope, this.excludeScope)); + filter.addFilter(new TypeFilter(this.includeTypes, this.excludeTypes)); + filter.addFilter(new ClassifierFilter(this.includeClassifiers, this.excludeClassifiers)); + filter.addFilter(new GroupIdFilter(this.includeGroupIds, this.excludeGroupIds)); + filter.addFilter(new ArtifactIdFilter(this.includeArtifactIds, this.excludeArtifactIds)); + + // explicitly filter our nar dependencies + filter.addFilter(new TypeFilter("", "nar")); + + // start with all artifacts. + Set artifacts = project.getArtifacts(); + + // perform filtering + try { + artifacts = filter.filter(artifacts); + } catch (ArtifactFilterException e) { + throw new MojoExecutionException(e.getMessage(), e); + } + + // transform artifacts if classifier is set + final DependencyStatusSets status; + if (StringUtils.isNotEmpty(copyDepClassifier)) { + status = getClassifierTranslatedDependencies(artifacts, stopOnFailure); + } else { + status = filterMarkedDependencies(artifacts); + } + + return status; + } + + protected DependencyStatusSets getClassifierTranslatedDependencies(Set artifacts, boolean stopOnFailure) throws MojoExecutionException { + Set unResolvedArtifacts = new HashSet(); + Set resolvedArtifacts = artifacts; + DependencyStatusSets status = new DependencyStatusSets(); + + // possibly translate artifacts into a new set of artifacts based on the + // classifier and type + // if this did something, we need to resolve the new artifacts + if (StringUtils.isNotEmpty(copyDepClassifier)) { + ArtifactTranslator translator = new ClassifierTypeTranslator(this.copyDepClassifier, this.type, this.factory); + artifacts = translator.translate(artifacts, getLog()); + + status = filterMarkedDependencies(artifacts); + + // the unskipped artifacts are in the resolved set. + artifacts = status.getResolvedDependencies(); + + // resolve the rest of the artifacts + ArtifactsResolver artifactsResolver = new DefaultArtifactsResolver(this.resolver, this.local, + this.remoteRepos, stopOnFailure); + resolvedArtifacts = artifactsResolver.resolve(artifacts, getLog()); + + // calculate the artifacts not resolved. + unResolvedArtifacts.addAll(artifacts); + unResolvedArtifacts.removeAll(resolvedArtifacts); + } + + // return a bean of all 3 sets. + status.setResolvedDependencies(resolvedArtifacts); + status.setUnResolvedDependencies(unResolvedArtifacts); + + return status; + } + + protected DependencyStatusSets filterMarkedDependencies(Set artifacts) throws MojoExecutionException { + // remove files that have markers already + FilterArtifacts filter = new FilterArtifacts(); + filter.clearFilters(); + filter.addFilter(getMarkedArtifactFilter()); + + Set unMarkedArtifacts; + try { + unMarkedArtifacts = filter.filter(artifacts); + } catch (ArtifactFilterException e) { + throw new MojoExecutionException(e.getMessage(), e); + } + + // calculate the skipped artifacts + Set skippedArtifacts = new HashSet(); + skippedArtifacts.addAll(artifacts); + skippedArtifacts.removeAll(unMarkedArtifacts); + + return new DependencyStatusSets(unMarkedArtifacts, null, skippedArtifacts); + } + + protected void copyFile(File artifact, File destFile) throws MojoExecutionException { + try { + getLog().info("Copying " + (this.outputAbsoluteArtifactFilename ? artifact.getAbsolutePath() : artifact.getName()) + " to " + destFile); + FileUtils.copyFile(artifact, destFile); + } catch (Exception e) { + throw new MojoExecutionException("Error copying artifact from " + artifact + " to " + destFile, e); + } + } + + private File getClassesDirectory() { + final File outputDirectory = new File(project.getBasedir(), "target"); + return new File(outputDirectory, "classes"); + } + + private File getDependenciesDirectory() { + return new File(getClassesDirectory(), "META-INF/dependencies"); + } + + private void makeNar() throws MojoExecutionException { + File narFile = createArchive(); + + if (classifier != null) { + projectHelper.attachArtifact(project, "nar", classifier, narFile); + } else { + project.getArtifact().setFile(narFile); + } + } + + public File createArchive() throws MojoExecutionException { + final File outputDirectory = new File(project.getBasedir(), "target"); + File narFile = getNarFile(outputDirectory, finalName, classifier); + MavenArchiver archiver = new MavenArchiver(); + archiver.setArchiver(jarArchiver); + archiver.setOutputFile(narFile); + archive.setForced(forceCreation); + + try { + File contentDirectory = getClassesDirectory(); + if (!contentDirectory.exists()) { + getLog().warn("NAR will be empty - no content was marked for inclusion!"); + } else { + archiver.getArchiver().addDirectory(contentDirectory, getIncludes(), getExcludes()); + } + + File existingManifest = defaultManifestFile; + if (useDefaultManifestFile && existingManifest.exists() && archive.getManifestFile() == null) { + getLog().info("Adding existing MANIFEST to archive. Found under: " + existingManifest.getPath()); + archive.setManifestFile(existingManifest); + } + + // automatically add the artifact id to the manifest + archive.addManifestEntry("Nar-Id", project.getArtifactId()); + + // look for a nar dependency + String narDependency = getNarDependency(); + if (narDependency != null) { + archive.addManifestEntry("Nar-Dependency-Id", narDependency); + } + + archiver.createArchive(session, project, archive); + return narFile; + } catch (ArchiverException | MojoExecutionException | ManifestException | IOException | DependencyResolutionRequiredException e) { + throw new MojoExecutionException("Error assembling NAR", e); + } + } + + private String[] getIncludes() { + if (includes != null && includes.length > 0) { + return includes; + } + return DEFAULT_INCLUDES; + } + + private String[] getExcludes() { + if (excludes != null && excludes.length > 0) { + return excludes; + } + return DEFAULT_EXCLUDES; + } + + protected File getNarFile(File basedir, String finalName, String classifier) { + if (classifier == null) { + classifier = ""; + } else if (classifier.trim().length() > 0 && !classifier.startsWith("-")) { + classifier = "-" + classifier; + } + + return new File(basedir, finalName + classifier + ".nar"); + } + + private String getNarDependency() throws MojoExecutionException { + String narDependency = null; + + // get nar dependencies + FilterArtifacts filter = new FilterArtifacts(); + filter.addFilter(new TypeFilter("nar", "")); + + // start with all artifacts. + Set artifacts = project.getArtifacts(); + + // perform filtering + try { + artifacts = filter.filter(artifacts); + } catch (ArtifactFilterException e) { + throw new MojoExecutionException(e.getMessage(), e); + } + + // ensure there is a single nar dependency + if (artifacts.size() > 1) { + throw new MojoExecutionException("Each NAR represents a ClassLoader. A NAR dependency allows that NAR's ClassLoader to be " + + "used as the parent of this NAR's ClassLoader. As a result, only a single NAR dependency is allowed."); + } else if (artifacts.size() == 1) { + final Artifact artifact = (Artifact) artifacts.iterator().next(); + narDependency = artifact.getArtifactId(); + } + + return narDependency; + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/misc/src/main/resources/META-INF/plexus/components.xml ---------------------------------------------------------------------- diff --git a/misc/src/main/resources/META-INF/plexus/components.xml b/misc/src/main/resources/META-INF/plexus/components.xml new file mode 100644 index 0000000..0680d18 --- /dev/null +++ b/misc/src/main/resources/META-INF/plexus/components.xml @@ -0,0 +1,52 @@ +<?xml version="1.0"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<component-set> + <components> + <component> + <role>org.apache.maven.lifecycle.mapping.LifecycleMapping</role> + <role-hint>nar</role-hint> + <implementation>org.apache.maven.lifecycle.mapping.DefaultLifecycleMapping</implementation> + <configuration> + <lifecycles> + <lifecycle> + <id>default</id> + <phases> + <process-resources>org.apache.maven.plugins:maven-resources-plugin:resources</process-resources> + <compile>org.apache.maven.plugins:maven-compiler-plugin:compile</compile> + <process-test-resources>org.apache.maven.plugins:maven-resources-plugin:testResources</process-test-resources> + <test-compile>org.apache.maven.plugins:maven-compiler-plugin:testCompile</test-compile> + <test>org.apache.maven.plugins:maven-surefire-plugin:test</test> + <package>org.apache.nifi:nar-maven-plugin:nar</package> + <install>org.apache.maven.plugins:maven-install-plugin:install</install> + <deploy>org.apache.maven.plugins:maven-deploy-plugin:deploy</deploy> + </phases> + </lifecycle> + </lifecycles> + </configuration> + </component> + <component> + <role>org.apache.maven.artifact.handler.ArtifactHandler</role> + <role-hint>nar</role-hint> + <implementation>org.apache.maven.artifact.handler.DefaultArtifactHandler</implementation> + <configuration> + <type>nar</type> + <language>java</language> + <addedToClasspath>false</addedToClasspath> + <includesDependencies>true</includesDependencies> + </configuration> + </component> + </components> +</component-set> http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/pom.xml ---------------------------------------------------------------------- diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/pom.xml b/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/pom.xml deleted file mode 100644 index 6280349..0000000 --- a/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/pom.xml +++ /dev/null @@ -1,67 +0,0 @@ -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.apache.nifi</groupId> - <artifactId>distributed-cache-services-bundle</artifactId> - <version>0.0.1-SNAPSHOT</version> - </parent> - - <artifactId>distributed-cache-client-service</artifactId> - <packaging>jar</packaging> - - <name>Distributed Cache Client Service</name> - <description>Provides a Client for interfacing with a Distributed Cache</description> - - <dependencies> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-api</artifactId> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>distributed-cache-client-service-api</artifactId> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>distributed-cache-protocol</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>remote-communications-utils</artifactId> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-processor-utils</artifactId> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-stream-utils</artifactId> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>ssl-context-service-api</artifactId> - </dependency> - <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - <version>4.9</version> - <scope>test</scope> - </dependency> - </dependencies> -</project> http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CommsSession.java ---------------------------------------------------------------------- diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CommsSession.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CommsSession.java deleted file mode 100644 index f838c2f..0000000 --- a/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CommsSession.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.distributed.cache.client; - -import java.io.Closeable; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.concurrent.TimeUnit; - -import javax.net.ssl.SSLContext; - -public interface CommsSession extends Closeable { - - void setTimeout(final long value, final TimeUnit timeUnit); - - InputStream getInputStream() throws IOException; - - OutputStream getOutputStream() throws IOException; - - boolean isClosed(); - - void interrupt(); - - String getHostname(); - - int getPort(); - - long getTimeout(TimeUnit timeUnit); - - SSLContext getSSLContext(); -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java ---------------------------------------------------------------------- diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java deleted file mode 100644 index ee96660..0000000 --- a/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java +++ /dev/null @@ -1,301 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.distributed.cache.client; - -import java.io.DataInputStream; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; - -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.controller.AbstractControllerService; -import org.apache.nifi.controller.ConfigurationContext; -import org.apache.nifi.controller.annotation.OnConfigured; -import org.apache.nifi.distributed.cache.protocol.ProtocolHandshake; -import org.apache.nifi.distributed.cache.protocol.exception.HandshakeException; -import org.apache.nifi.io.ByteArrayOutputStream; -import org.apache.nifi.io.DataOutputStream; -import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.remote.StandardVersionNegotiator; -import org.apache.nifi.remote.VersionNegotiator; -import org.apache.nifi.ssl.SSLContextService; -import org.apache.nifi.ssl.SSLContextService.ClientAuth; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class DistributedMapCacheClientService extends AbstractControllerService implements DistributedMapCacheClient { - - private static final Logger logger = LoggerFactory.getLogger(DistributedMapCacheClientService.class); - - public static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder() - .name("Server Hostname") - .description("The name of the server that is running the DistributedMapCacheServer service") - .required(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); - public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder() - .name("Server Port") - .description("The port on the remote server that is to be used when communicating with the DistributedMapCacheServer service") - .required(true) - .addValidator(StandardValidators.PORT_VALIDATOR) - .defaultValue("4557") - .build(); - public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() - .name("SSL Context Service") - .description( - "If specified, indicates the SSL Context Service that is used to communicate with the remote server. If not specified, communications will not be encrypted") - .required(false) - .addValidator(StandardValidators.createControllerServiceExistsValidator(SSLContextService.class)) - .defaultValue(null) - .build(); - public static final PropertyDescriptor COMMUNICATIONS_TIMEOUT = new PropertyDescriptor.Builder() - .name("Communications Timeout") - .description( - "Specifies how long to wait when communicating with the remote server before determining that there is a communications failure if data cannot be sent or received") - .required(true) - .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) - .defaultValue("30 secs") - .build(); - - private final BlockingQueue<CommsSession> queue = new LinkedBlockingQueue<>(); - private volatile ConfigurationContext configContext; - private volatile boolean closed = false; - - @Override - protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { - final List<PropertyDescriptor> descriptors = new ArrayList<>(); - descriptors.add(HOSTNAME); - descriptors.add(PORT); - descriptors.add(SSL_CONTEXT_SERVICE); - descriptors.add(COMMUNICATIONS_TIMEOUT); - return descriptors; - } - - @OnConfigured - public void cacheConfig(final ConfigurationContext context) { - this.configContext = context; - } - - @Override - public <K, V> boolean putIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) - throws IOException { - return withCommsSession(new CommsAction<Boolean>() { - @Override - public Boolean execute(final CommsSession session) throws IOException { - final DataOutputStream dos = new DataOutputStream(session.getOutputStream()); - dos.writeUTF("putIfAbsent"); - - serialize(key, keySerializer, dos); - serialize(value, valueSerializer, dos); - - dos.flush(); - - final DataInputStream dis = new DataInputStream(session.getInputStream()); - return dis.readBoolean(); - } - }); - } - - @Override - public <K> boolean containsKey(final K key, final Serializer<K> keySerializer) throws IOException { - return withCommsSession(new CommsAction<Boolean>() { - @Override - public Boolean execute(final CommsSession session) throws IOException { - final DataOutputStream dos = new DataOutputStream(session.getOutputStream()); - dos.writeUTF("containsKey"); - - serialize(key, keySerializer, dos); - dos.flush(); - - final DataInputStream dis = new DataInputStream(session.getInputStream()); - return dis.readBoolean(); - } - }); - } - - @Override - public <K, V> V getAndPutIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer, - final Deserializer<V> valueDeserializer) throws IOException { - return withCommsSession(new CommsAction<V>() { - @Override - public V execute(final CommsSession session) throws IOException { - final DataOutputStream dos = new DataOutputStream(session.getOutputStream()); - dos.writeUTF("getAndPutIfAbsent"); - - serialize(key, keySerializer, dos); - serialize(value, valueSerializer, dos); - dos.flush(); - - // read response - final DataInputStream dis = new DataInputStream(session.getInputStream()); - final byte[] responseBuffer = readLengthDelimitedResponse(dis); - return valueDeserializer.deserialize(responseBuffer); - } - }); - } - - @Override - public <K, V> V get(final K key, final Serializer<K> keySerializer, final Deserializer<V> valueDeserializer) throws IOException { - return withCommsSession(new CommsAction<V>() { - @Override - public V execute(final CommsSession session) throws IOException { - final DataOutputStream dos = new DataOutputStream(session.getOutputStream()); - dos.writeUTF("get"); - - serialize(key, keySerializer, dos); - dos.flush(); - - // read response - final DataInputStream dis = new DataInputStream(session.getInputStream()); - final byte[] responseBuffer = readLengthDelimitedResponse(dis); - return valueDeserializer.deserialize(responseBuffer); - } - }); - } - - @Override - public <K> boolean remove(final K key, final Serializer<K> serializer) throws IOException { - return withCommsSession(new CommsAction<Boolean>() { - @Override - public Boolean execute(final CommsSession session) throws IOException { - final DataOutputStream dos = new DataOutputStream(session.getOutputStream()); - dos.writeUTF("remove"); - - serialize(key, serializer, dos); - dos.flush(); - - // read response - final DataInputStream dis = new DataInputStream(session.getInputStream()); - return dis.readBoolean(); - } - }); - } - - private byte[] readLengthDelimitedResponse(final DataInputStream dis) throws IOException { - final int responseLength = dis.readInt(); - final byte[] responseBuffer = new byte[responseLength]; - dis.readFully(responseBuffer); - return responseBuffer; - } - - public CommsSession createCommsSession(final ConfigurationContext context) throws IOException { - final String hostname = context.getProperty(HOSTNAME).getValue(); - final int port = context.getProperty(PORT).asInteger(); - final long timeoutMillis = context.getProperty(COMMUNICATIONS_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS); - final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); - - final CommsSession commsSession; - if (sslContextService == null) { - commsSession = new StandardCommsSession(hostname, port); - } else { - commsSession = new SSLCommsSession(sslContextService.createSSLContext(ClientAuth.REQUIRED), hostname, port); - } - - commsSession.setTimeout(timeoutMillis, TimeUnit.MILLISECONDS); - return commsSession; - } - - private CommsSession leaseCommsSession() throws IOException { - CommsSession session = queue.poll(); - if (session != null && !session.isClosed()) { - return session; - } - - session = createCommsSession(configContext); - final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(1); - try { - ProtocolHandshake.initiateHandshake(session.getInputStream(), session.getOutputStream(), versionNegotiator); - } catch (final HandshakeException e) { - try { - session.close(); - } catch (final IOException ioe) { - } - - throw new IOException(e); - } - - return session; - } - - @Override - public void close() throws IOException { - this.closed = true; - - CommsSession commsSession; - while ((commsSession = queue.poll()) != null) { - try (final DataOutputStream dos = new DataOutputStream(commsSession.getOutputStream())) { - dos.writeUTF("close"); - dos.flush(); - commsSession.close(); - } catch (final IOException e) { - } - } - logger.info("Closed {}", new Object[] { getIdentifier() }); - } - - @Override - protected void finalize() throws Throwable { - if (!closed) - close(); - logger.debug("Finalize called"); - } - - private <T> void serialize(final T value, final Serializer<T> serializer, final DataOutputStream dos) throws IOException { - final ByteArrayOutputStream baos = new ByteArrayOutputStream(); - serializer.serialize(value, baos); - dos.writeInt(baos.size()); - baos.writeTo(dos); - } - - private <T> T withCommsSession(final CommsAction<T> action) throws IOException { - if (closed) { - throw new IllegalStateException("Client is closed"); - } - - final CommsSession session = leaseCommsSession(); - try { - return action.execute(session); - } catch (final IOException ioe) { - try { - session.close(); - } catch (final IOException ignored) { - } - - throw ioe; - } finally { - if (!session.isClosed()) { - if (this.closed) { - try { - session.close(); - } catch (final IOException ioe) { - } - } else { - queue.offer(session); - } - } - } - } - - private static interface CommsAction<T> { - T execute(CommsSession commsSession) throws IOException; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java ---------------------------------------------------------------------- diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java deleted file mode 100644 index 1d7c94c..0000000 --- a/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java +++ /dev/null @@ -1,212 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.distributed.cache.client; - -import java.io.DataInputStream; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; - -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.controller.AbstractControllerService; -import org.apache.nifi.controller.ConfigurationContext; -import org.apache.nifi.controller.annotation.OnConfigured; -import org.apache.nifi.distributed.cache.protocol.ProtocolHandshake; -import org.apache.nifi.distributed.cache.protocol.exception.HandshakeException; -import org.apache.nifi.io.ByteArrayOutputStream; -import org.apache.nifi.io.DataOutputStream; -import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.remote.StandardVersionNegotiator; -import org.apache.nifi.remote.VersionNegotiator; -import org.apache.nifi.ssl.SSLContextService; -import org.apache.nifi.ssl.SSLContextService.ClientAuth; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class DistributedSetCacheClientService extends AbstractControllerService implements DistributedSetCacheClient { - - private static final Logger logger = LoggerFactory.getLogger(DistributedMapCacheClientService.class); - - public static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder() - .name("Server Hostname") - .description("The name of the server that is running the DistributedSetCacheServer service") - .required(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); - public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder() - .name("Server Port") - .description("The port on the remote server that is to be used when communicating with the DistributedSetCacheServer service") - .required(true) - .addValidator(StandardValidators.PORT_VALIDATOR) - .defaultValue("4557") - .build(); - public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() - .name("SSL Context Service") - .description( - "If specified, indicates the SSL Context Service that is used to communicate with the remote server. If not specified, communications will not be encrypted") - .required(false) - .addValidator(StandardValidators.createControllerServiceExistsValidator(SSLContextService.class)) - .defaultValue(null) - .build(); - public static final PropertyDescriptor COMMUNICATIONS_TIMEOUT = new PropertyDescriptor.Builder() - .name("Communications Timeout") - .description( - "Specifices how long to wait when communicating with the remote server before determining that there is a communications failure if data cannot be sent or received") - .required(true) - .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) - .defaultValue("30 secs") - .build(); - - private final BlockingQueue<CommsSession> queue = new LinkedBlockingQueue<>(); - private volatile ConfigurationContext configContext; - private volatile boolean closed = false; - - @Override - protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { - final List<PropertyDescriptor> descriptors = new ArrayList<>(); - descriptors.add(HOSTNAME); - descriptors.add(PORT); - descriptors.add(SSL_CONTEXT_SERVICE); - descriptors.add(COMMUNICATIONS_TIMEOUT); - return descriptors; - } - - @OnConfigured - public void onConfigured(final ConfigurationContext context) { - this.configContext = context; - } - - public CommsSession createCommsSession(final ConfigurationContext context) throws IOException { - final String hostname = context.getProperty(HOSTNAME).getValue(); - final int port = context.getProperty(PORT).asInteger(); - final long timeoutMillis = context.getProperty(COMMUNICATIONS_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS); - final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); - - final CommsSession commsSession; - if (sslContextService == null) { - commsSession = new StandardCommsSession(hostname, port); - } else { - commsSession = new SSLCommsSession(sslContextService.createSSLContext(ClientAuth.REQUIRED), hostname, port); - } - - commsSession.setTimeout(timeoutMillis, TimeUnit.MILLISECONDS); - return commsSession; - } - - private CommsSession leaseCommsSession() throws IOException { - CommsSession session = queue.poll(); - if (session != null && !session.isClosed()) { - return session; - } - - session = createCommsSession(configContext); - final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(1); - try { - ProtocolHandshake.initiateHandshake(session.getInputStream(), session.getOutputStream(), versionNegotiator); - } catch (final HandshakeException e) { - try { - session.close(); - } catch (final IOException ioe) { - } - - throw new IOException(e); - } - - return session; - } - - @Override - public <T> boolean addIfAbsent(final T value, final Serializer<T> serializer) throws IOException { - return invokeRemoteBoolean("addIfAbsent", value, serializer); - } - - @Override - public <T> boolean contains(final T value, final Serializer<T> serializer) throws IOException { - return invokeRemoteBoolean("contains", value, serializer); - } - - @Override - public <T> boolean remove(final T value, final Serializer<T> serializer) throws IOException { - return invokeRemoteBoolean("remove", value, serializer); - } - - @Override - public void close() throws IOException { - this.closed = true; - - CommsSession commsSession; - while ((commsSession = queue.poll()) != null) { - try (final DataOutputStream dos = new DataOutputStream(commsSession.getOutputStream())) { - dos.writeUTF("close"); - dos.flush(); - commsSession.close(); - } catch (final IOException e) { - } - } - logger.info("Closed {}", new Object[] { getIdentifier() }); - } - - @Override - protected void finalize() throws Throwable { - if (!closed) - close(); - logger.debug("Finalize called"); - } - - private <T> boolean invokeRemoteBoolean(final String methodName, final T value, final Serializer<T> serializer) throws IOException { - if (closed) { - throw new IllegalStateException("Client is closed"); - } - - final CommsSession session = leaseCommsSession(); - try { - final DataOutputStream dos = new DataOutputStream(session.getOutputStream()); - dos.writeUTF(methodName); - - final ByteArrayOutputStream baos = new ByteArrayOutputStream(); - serializer.serialize(value, baos); - dos.writeInt(baos.size()); - baos.writeTo(dos); - dos.flush(); - - final DataInputStream dis = new DataInputStream(session.getInputStream()); - return dis.readBoolean(); - } catch (final IOException ioe) { - try { - session.close(); - } catch (final IOException ignored) { - } - - throw ioe; - } finally { - if (!session.isClosed()) { - if (this.closed) { - try { - session.close(); - } catch (final IOException ioe) { - } - } else { - queue.offer(session); - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/SSLCommsSession.java ---------------------------------------------------------------------- diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/SSLCommsSession.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/SSLCommsSession.java deleted file mode 100644 index c8be082..0000000 --- a/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/SSLCommsSession.java +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.distributed.cache.client; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.concurrent.TimeUnit; - -import javax.net.ssl.SSLContext; - -import org.apache.nifi.io.BufferedInputStream; -import org.apache.nifi.io.BufferedOutputStream; -import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel; -import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelInputStream; -import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelOutputStream; - -public class SSLCommsSession implements CommsSession { - private final SSLSocketChannel sslSocketChannel; - private final SSLContext sslContext; - private final String hostname; - private final int port; - - private final SSLSocketChannelInputStream in; - private final BufferedInputStream bufferedIn; - - private final SSLSocketChannelOutputStream out; - private final BufferedOutputStream bufferedOut; - - public SSLCommsSession(final SSLContext sslContext, final String hostname, final int port) throws IOException { - sslSocketChannel = new SSLSocketChannel(sslContext, hostname, port, true); - - in = new SSLSocketChannelInputStream(sslSocketChannel); - bufferedIn = new BufferedInputStream(in); - - out = new SSLSocketChannelOutputStream(sslSocketChannel); - bufferedOut = new BufferedOutputStream(out); - - this.sslContext = sslContext; - this.hostname = hostname; - this.port = port; - } - - @Override - public void interrupt() { - sslSocketChannel.interrupt(); - } - - @Override - public void close() throws IOException { - sslSocketChannel.close(); - } - - @Override - public void setTimeout(final long value, final TimeUnit timeUnit) { - sslSocketChannel.setTimeout((int) TimeUnit.MILLISECONDS.convert(value, timeUnit)); - } - - @Override - public InputStream getInputStream() throws IOException { - return bufferedIn; - } - - @Override - public OutputStream getOutputStream() throws IOException { - return bufferedOut; - } - - @Override - public boolean isClosed() { - return sslSocketChannel.isClosed(); - } - - @Override - public String getHostname() { - return hostname; - } - - @Override - public int getPort() { - return port; - } - @Override - public SSLContext getSSLContext() { - return sslContext; - } - @Override - public long getTimeout(final TimeUnit timeUnit) { - return timeUnit.convert(sslSocketChannel.getTimeout(), TimeUnit.MILLISECONDS); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/StandardCommsSession.java ---------------------------------------------------------------------- diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/StandardCommsSession.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/StandardCommsSession.java deleted file mode 100644 index bbe2917..0000000 --- a/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/StandardCommsSession.java +++ /dev/null @@ -1,124 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.distributed.cache.client; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.net.InetSocketAddress; -import java.nio.channels.SocketChannel; -import java.util.concurrent.TimeUnit; - -import javax.net.ssl.SSLContext; - -import org.apache.nifi.io.BufferedInputStream; -import org.apache.nifi.io.BufferedOutputStream; -import org.apache.nifi.remote.io.InterruptableInputStream; -import org.apache.nifi.remote.io.InterruptableOutputStream; -import org.apache.nifi.remote.io.socket.SocketChannelInputStream; -import org.apache.nifi.remote.io.socket.SocketChannelOutputStream; - -public class StandardCommsSession implements CommsSession { - private final SocketChannel socketChannel; - private final String hostname; - private final int port; - private volatile long timeoutMillis; - - private final SocketChannelInputStream in; - private final InterruptableInputStream bufferedIn; - - private final SocketChannelOutputStream out; - private final InterruptableOutputStream bufferedOut; - - public StandardCommsSession(final String hostname, final int port) throws IOException { - socketChannel = SocketChannel.open(new InetSocketAddress(hostname, port)); - socketChannel.configureBlocking(false); - in = new SocketChannelInputStream(socketChannel); - bufferedIn = new InterruptableInputStream(new BufferedInputStream(in)); - - out = new SocketChannelOutputStream(socketChannel); - bufferedOut = new InterruptableOutputStream(new BufferedOutputStream(out)); - - this.hostname = hostname; - this.port = port; - } - - @Override - public void interrupt() { - bufferedIn.interrupt(); - bufferedOut.interrupt(); - } - - @Override - public void close() throws IOException { - socketChannel.close(); - } - - @Override - public void setTimeout(final long value, final TimeUnit timeUnit) { - in.setTimeout((int) TimeUnit.MILLISECONDS.convert(value, timeUnit)); - out.setTimeout((int) TimeUnit.MILLISECONDS.convert(value, timeUnit)); - timeoutMillis = TimeUnit.MILLISECONDS.convert(value, timeUnit); - } - - @Override - public InputStream getInputStream() throws IOException { - return bufferedIn; - } - - @Override - public OutputStream getOutputStream() throws IOException { - return bufferedOut; - } - - @Override - public boolean isClosed() { - boolean closed = !socketChannel.isConnected(); - if (!closed) { - try { - this.in.isDataAvailable(); - } catch (IOException e) { - try { - close(); - } catch (IOException e1) { - } - closed = true; - } - } - return closed; - } - - @Override - public String getHostname() { - return hostname; - } - - @Override - public int getPort() { - return port; - } - - @Override - public SSLContext getSSLContext() { - return null; - } - - @Override - public long getTimeout(final TimeUnit timeUnit) { - return timeUnit.convert(timeoutMillis, TimeUnit.MILLISECONDS); - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService ---------------------------------------------------------------------- diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService deleted file mode 100644 index a91f7ee..0000000 --- a/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService +++ /dev/null @@ -1,16 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -org.apache.nifi.distributed.cache.client.DistributedSetCacheClientService -org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/resources/docs/org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService/index.html ---------------------------------------------------------------------- diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/resources/docs/org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService/index.html b/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/resources/docs/org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService/index.html deleted file mode 100644 index d5f3595..0000000 --- a/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/resources/docs/org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService/index.html +++ /dev/null @@ -1,78 +0,0 @@ -<!DOCTYPE html> -<html lang="en"> -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> -<head> -<meta charset="utf-8" /> -<title>Distributed Map Cache Client Service</title> -<link rel="stylesheet" href="../../css/component-usage.css" type="text/css" /> -</head> - -<body> - <h2>Description:</h2> - - <p>A Controller Service that can be used to communicate with a - Distributed Map Cache Server.</p> - - - - <p> - <strong>Properties:</strong> - </p> - <p>In the list below, the names of required properties appear - in bold. Any other properties (not in bold) are considered optional. - If a property has a default value, it is indicated. If a property - supports the use of the NiFi Expression Language (or simply, - "expression language"), that is also indicated.</p> - - <ul> - <li><strong>Server Hostname</strong> - <ul> - <li>The name of the server that is running the DistributedMapCacheServer service</li> - <li>Default value: no default</li> - <li>Supports expression language: false</li> - </ul></li> - <li><strong>Server Port</strong> - <ul> - <li>The port on the remote server that is to be used when communicating with the - <a href="../nifi.distributed.cache.server.map.DistributedMapCacheServer/index.html">DistributedMapCacheServer</a> service</li> - - <li>Default value: 4557</li> - <li>Supports expression language: false</li> - </ul></li> - <li>SSL Context Service - <ul> - <li>If specified, indicates the SSL Context Service that is used to communicate with the remote server. If not specified, communications will not be encrypted - <li>Default value: no default</li> - <li>Supports expression language: false</li> - </ul></li> - <li><strong>Communications Timeout</strong> - <ul> - <li>Specifices how long to wait when communicating with the remote server before determining that there is a communications failure if data cannot be sent or received - <li>Default value: 30 secs</li> - <li>Supports expression language: false</li> - </ul></li> - - </ul> - - - <i>See Also:</i> - <ul> - <li><a href="../org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer/index.html">Distributed Map Cache Server</a></li> - <li><a href="../org.apache.nifi.ssl.StandardSSLContextService/index.html">Standard SSL Context Service</a></li> - </ul> - -</body> -</html> http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/resources/docs/org.apache.nifi.distributed.cache.client.DistributedSetCacheClientService/index.html ---------------------------------------------------------------------- diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/resources/docs/org.apache.nifi.distributed.cache.client.DistributedSetCacheClientService/index.html b/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/resources/docs/org.apache.nifi.distributed.cache.client.DistributedSetCacheClientService/index.html deleted file mode 100755 index e69de29..0000000 http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/distributed-cache-services-bundle/distributed-cache-protocol/pom.xml ---------------------------------------------------------------------- diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-protocol/pom.xml b/nar-bundles/distributed-cache-services-bundle/distributed-cache-protocol/pom.xml deleted file mode 100644 index bc612ae..0000000 --- a/nar-bundles/distributed-cache-services-bundle/distributed-cache-protocol/pom.xml +++ /dev/null @@ -1,39 +0,0 @@ -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> - <modelVersion>4.0.0</modelVersion> - <parent> - <groupId>org.apache.nifi</groupId> - <artifactId>distributed-cache-services-bundle</artifactId> - <version>0.0.1-SNAPSHOT</version> - </parent> - - <artifactId>distributed-cache-protocol</artifactId> - <name>Distributed Cache Protocol</name> - - <description> - Defines the communications protocol that is used between clients and servers - for the Distributed Cache services - </description> - - - <dependencies> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>remote-communications-utils</artifactId> - </dependency> - </dependencies> - -</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/distributed-cache-services-bundle/distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/protocol/ProtocolHandshake.java ---------------------------------------------------------------------- diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/protocol/ProtocolHandshake.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/protocol/ProtocolHandshake.java deleted file mode 100644 index da2acad..0000000 --- a/nar-bundles/distributed-cache-services-bundle/distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/protocol/ProtocolHandshake.java +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.distributed.cache.protocol; - -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; - -import org.apache.nifi.distributed.cache.protocol.exception.HandshakeException; -import org.apache.nifi.remote.VersionNegotiator; - -public class ProtocolHandshake { - - public static final byte[] MAGIC_HEADER = new byte[] { 'N', 'i', 'F', 'i' }; - - public static final int RESOURCE_OK = 20; - public static final int DIFFERENT_RESOURCE_VERSION = 21; - public static final int ABORT = 255; - - - public static void initiateHandshake(final InputStream in, final OutputStream out, final VersionNegotiator versionNegotiator) throws IOException, HandshakeException { - final DataInputStream dis = new DataInputStream(in); - final DataOutputStream dos = new DataOutputStream(out); - - try { - dos.write(MAGIC_HEADER); - - initiateVersionNegotiation(versionNegotiator, dis, dos); - } finally { - dos.flush(); - } - } - - - public static void receiveHandshake(final InputStream in, final OutputStream out, final VersionNegotiator versionNegotiator) throws IOException, HandshakeException { - final DataInputStream dis = new DataInputStream(in); - final DataOutputStream dos = new DataOutputStream(out); - - try { - final byte[] magicHeaderBuffer = new byte[4]; - dis.readFully(magicHeaderBuffer); - - receiveVersionNegotiation(versionNegotiator, dis, dos); - } finally { - dos.flush(); - } - } - - - private static void initiateVersionNegotiation(final VersionNegotiator negotiator, final DataInputStream dis, final DataOutputStream dos) throws IOException, HandshakeException { - // Write the classname of the RemoteStreamCodec, followed by its version - dos.writeInt(negotiator.getVersion()); - dos.flush(); - - // wait for response from server. - final int statusCode = dis.read(); - switch (statusCode) { - case RESOURCE_OK: // server accepted our proposal of codec name/version - return; - case DIFFERENT_RESOURCE_VERSION: // server accepted our proposal of codec name but not the version - // Get server's preferred version - final int newVersion = dis.readInt(); - - // Determine our new preferred version that is no greater than the server's preferred version. - final Integer newPreference = negotiator.getPreferredVersion(newVersion); - // If we could not agree with server on a version, fail now. - if ( newPreference == null ) { - throw new HandshakeException("Could not agree on protocol version"); - } - - negotiator.setVersion(newPreference); - - // Attempt negotiation of resource based on our new preferred version. - initiateVersionNegotiation(negotiator, dis, dos); - case ABORT: - throw new HandshakeException("Remote destination aborted connection with message: " + dis.readUTF()); - default: - throw new HandshakeException("Received unexpected response code " + statusCode + " when negotiating version with remote server"); - } - } - - private static void receiveVersionNegotiation(final VersionNegotiator negotiator, final DataInputStream dis, final DataOutputStream dos) throws IOException, HandshakeException { - final int version = dis.readInt(); - if ( negotiator.isVersionSupported(version) ) { - dos.write(RESOURCE_OK); - dos.flush(); - - negotiator.setVersion(version); - } else { - final Integer preferred = negotiator.getPreferredVersion(version); - if ( preferred == null ) { - dos.write(ABORT); - dos.flush(); - throw new HandshakeException("Unable to negotiate an acceptable version of the Distributed Cache Protocol"); - } - dos.write(DIFFERENT_RESOURCE_VERSION); - dos.writeInt(preferred); - dos.flush(); - - receiveVersionNegotiation(negotiator, dis, dos); - } - } -}