KAFKA-3611: Remove warnings when using reflections ewencp granders Can you take a look? Thanks!
Author: Liquan Pei <[email protected]> Reviewers: Ewen Cheslack-Postava <[email protected]> Closes #1259 from Ishiihara/fix-warning Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/316389d6 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/316389d6 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/316389d6 Branch: refs/heads/0.10.0 Commit: 316389d6adfb1398e30ca2ce5d586ea94d3f3110 Parents: 57831a5 Author: Liquan Pei <[email protected]> Authored: Thu Apr 28 11:59:02 2016 -0700 Committer: Ewen Cheslack-Postava <[email protected]> Committed: Thu Apr 28 11:59:02 2016 -0700 ---------------------------------------------------------------------- bin/kafka-run-class.sh | 6 +- checkstyle/import-control.xml | 1 + .../kafka/connect/runtime/AbstractHerder.java | 10 ++- .../kafka/connect/util/ReflectionsUtil.java | 90 ++++++++++++++++++++ .../resources/ConnectorPluginsResourceTest.java | 3 +- 5 files changed, 104 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/316389d6/bin/kafka-run-class.sh ---------------------------------------------------------------------- diff --git a/bin/kafka-run-class.sh b/bin/kafka-run-class.sh index f45d8d4..88d43be 100755 --- a/bin/kafka-run-class.sh +++ b/bin/kafka-run-class.sh @@ -34,7 +34,11 @@ fi shopt -s nullglob for dir in $base_dir/core/build/dependant-libs-${SCALA_VERSION}*; do - CLASSPATH=$CLASSPATH:$dir/* + if [ -z $CLASSPATH ] ; then + CLASSPATH=$dir/* + else + CLASSPATH=$CLASSPATH:$dir/* + fi done for file in $base_dir/examples/build/libs//kafka-examples*.jar; http://git-wip-us.apache.org/repos/asf/kafka/blob/316389d6/checkstyle/import-control.xml ---------------------------------------------------------------------- diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index 39d4ca3..7a45515 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -221,6 +221,7 @@ <subpackage name="util"> <allow pkg="org.apache.kafka.connect" /> + <allow pkg="org.reflections.vfs" /> <!-- for annotations to avoid code duplication --> <allow pkg="com.fasterxml.jackson.annotation" /> </subpackage> http://git-wip-us.apache.org/repos/asf/kafka/blob/316389d6/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java index bd73589..83f56e2 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java @@ -34,6 +34,7 @@ import org.apache.kafka.connect.storage.StatusBackingStore; import org.apache.kafka.connect.tools.VerifiableSinkConnector; import org.apache.kafka.connect.tools.VerifiableSourceConnector; import org.apache.kafka.connect.util.ConnectorTaskId; +import org.apache.kafka.connect.util.ReflectionsUtil; import org.reflections.Reflections; import org.reflections.util.ClasspathHelper; import org.reflections.util.ConfigurationBuilder; @@ -82,11 +83,10 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con protected final ConfigBackingStore configBackingStore; private Map<String, Connector> tempConnectors = new ConcurrentHashMap<>(); - private static final List<Class<? extends Connector>> SKIPPED_CONNECTORS = Arrays.<Class<? extends Connector>>asList(VerifiableSourceConnector.class, VerifiableSinkConnector.class); private static List<ConnectorPluginInfo> validConnectorPlugins; private static final Object LOCK = new Object(); private Thread classPathTraverser; - + private static final List<Class<? extends Connector>> EXCLUDES = Arrays.<Class<? extends Connector>>asList(VerifiableSourceConnector.class, VerifiableSinkConnector.class); public AbstractHerder(Worker worker, String workerId, @@ -263,10 +263,12 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con if (validConnectorPlugins != null) { return validConnectorPlugins; } + ReflectionsUtil.registerUrlTypes(); + ConfigurationBuilder builder = new ConfigurationBuilder().setUrls(ClasspathHelper.forJavaClassPath()); + Reflections reflections = new Reflections(builder); - Reflections reflections = new Reflections(new ConfigurationBuilder().setUrls(ClasspathHelper.forJavaClassPath())); Set<Class<? extends Connector>> connectorClasses = reflections.getSubTypesOf(Connector.class); - connectorClasses.removeAll(SKIPPED_CONNECTORS); + connectorClasses.removeAll(EXCLUDES); List<ConnectorPluginInfo> connectorPlugins = new LinkedList<>(); for (Class<? extends Connector> connectorClass : connectorClasses) { int mod = connectorClass.getModifiers(); http://git-wip-us.apache.org/repos/asf/kafka/blob/316389d6/connect/runtime/src/main/java/org/apache/kafka/connect/util/ReflectionsUtil.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/ReflectionsUtil.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/ReflectionsUtil.java new file mode 100644 index 0000000..fc3a0dd --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/ReflectionsUtil.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + **/ +package org.apache.kafka.connect.util; + +import org.reflections.vfs.Vfs; +import org.reflections.vfs.Vfs.Dir; +import org.reflections.vfs.Vfs.File; +import org.reflections.vfs.Vfs.UrlType; + +import java.net.URL; +import java.util.Arrays; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; + +/** + * CLASSPATH on OSX contains .mar and .jnilib file extensions. Vfs used by Reflections does not recognize + * urls with those extensions and log WARNs when scan them. Those WARNs can be eliminated by registering + * URL types before using reflection. + */ +public class ReflectionsUtil { + + private static final String FILE_PROTOCOL = "file"; + private static final List<String> ENDINGS = Arrays.asList(".mar", ".jnilib", "*"); + + public static void registerUrlTypes() { + final List<UrlType> urlTypes = new LinkedList<>(); + urlTypes.add(new EmptyUrlType(ENDINGS)); + urlTypes.addAll(Arrays.asList(Vfs.DefaultUrlTypes.values())); + Vfs.setDefaultURLTypes(urlTypes); + } + + private static class EmptyUrlType implements UrlType { + + private final List<String> endings; + + private EmptyUrlType(final List<String> endings) { + this.endings = endings; + } + + public boolean matches(URL url) { + final String protocol = url.getProtocol(); + final String externalForm = url.toExternalForm(); + if (!protocol.equals(FILE_PROTOCOL)) { + return false; + } + for (String ending : endings) { + if (externalForm.endsWith(ending)) { + return true; + } + } + return false; + } + + public Dir createDir(final URL url) throws Exception { + return emptyVfsDir(url); + } + + private static Dir emptyVfsDir(final URL url) { + return new Dir() { + @Override + public String getPath() { + return url.toExternalForm(); + } + + @Override + public Iterable<File> getFiles() { + return Collections.emptyList(); + } + + @Override + public void close() { + + } + }; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/316389d6/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java index 732db3d..241d331 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java @@ -21,8 +21,8 @@ import com.fasterxml.jackson.core.type.TypeReference; import org.apache.kafka.common.config.Config; import org.apache.kafka.common.config.ConfigDef; -import org.apache.kafka.common.config.ConfigDef.Recommender; import org.apache.kafka.common.config.ConfigDef.Importance; +import org.apache.kafka.common.config.ConfigDef.Recommender; import org.apache.kafka.common.config.ConfigDef.Type; import org.apache.kafka.common.config.ConfigDef.Width; import org.apache.kafka.connect.connector.Connector; @@ -149,6 +149,7 @@ public class ConnectorPluginsResourceTest { assertTrue(connectorPlugins.contains(new ConnectorPluginInfo(ConnectorPluginsResourceTestConnector.class.getCanonicalName()))); } + /* Name here needs to be unique as we are testing the aliasing mechanism */ public static class ConnectorPluginsResourceTestConnector extends Connector {
