[GitHub] [kafka] C0urante commented on a diff in pull request #13185: KAFKA-14670: (part 1) Wrap Connectors in IsolatedConnector objects
C0urante commented on code in PR #13185: URL: https://github.com/apache/kafka/pull/13185#discussion_r1123470094 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java: ## @@ -243,7 +248,11 @@ public synchronized void requestTaskReconfiguration(String connName) { log.error("Task that requested reconfiguration does not exist: {}", connName); return; } -updateConnectorTasks(connName); +try { +updateConnectorTasks(connName); +} catch (Exception e) { +log.error("Unable to generate task configs for {}", connName, e); +} Review Comment: I don't think it's especially likely for connectors to continually invoke `requestTaskReconfiguration` given the automatic retry logic in distributed mode, and as of https://github.com/apache/kafka/pull/13276, the impact of ongoing retries for that operation is drastically reduced. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13185: KAFKA-14670: (part 1) Wrap Connectors in IsolatedConnector objects
C0urante commented on code in PR #13185: URL: https://github.com/apache/kafka/pull/13185#discussion_r1123467255 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java: ## @@ -243,7 +248,11 @@ public synchronized void requestTaskReconfiguration(String connName) { log.error("Task that requested reconfiguration does not exist: {}", connName); return; } -updateConnectorTasks(connName); +try { +updateConnectorTasks(connName); +} catch (Exception e) { +log.error("Unable to generate task configs for {}", connName, e); +} Review Comment: Okay, a lot to unpack here! The more I think about it, the more I like the existing behavior for handling failures in task config generation. We automatically retry in distributed mode in order to absorb the risk of writing to the config topic or issuing a REST request to the leader, but since neither of those take place in standalone mode, it's fine to just throw the exception back to the caller (either a connector invoking `ConnectorContext::requestTaskReconfiguration`, or a REST API call to restart the connector) since the likeliest cause is a failed call to `Connector::taskConfigs` and automatic retries are less likely to be useful. I think we should basically just preserve existing behavior here, with the one exception of fixing how we handle failed calls to `requestTaskReconfiguration` that occur during a call to `restartConnector`. Right now we don't handle any of those and, IIUC, just cause the REST request to time out after 90 seconds. Instead of timing out, we should return a 500 response in that case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13185: KAFKA-14670: (part 1) Wrap Connectors in IsolatedConnector objects
C0urante commented on code in PR #13185: URL: https://github.com/apache/kafka/pull/13185#discussion_r1120754565 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java: ## @@ -243,7 +248,11 @@ public synchronized void requestTaskReconfiguration(String connName) { log.error("Task that requested reconfiguration does not exist: {}", connName); return; } -updateConnectorTasks(connName); +try { +updateConnectorTasks(connName); +} catch (Exception e) { +log.error("Unable to generate task configs for {}", connName, e); +} Review Comment: This is a change in behavior too, right? We no longer throw in `ConnectorContext::requestTaskReconfiguration` if we encounter any errors. This also seems reasonable (it aligns the behavior across standalone and distributed modes), but it does have consequences for the REST API, where restarting a connector no longer fails if we're unable to generate task configs for it (which is currently the case for both distributed and standalone modes). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13185: KAFKA-14670: (part 1) Wrap Connectors in IsolatedConnector objects
C0urante commented on code in PR #13185: URL: https://github.com/apache/kafka/pull/13185#discussion_r1120754565 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java: ## @@ -243,7 +248,11 @@ public synchronized void requestTaskReconfiguration(String connName) { log.error("Task that requested reconfiguration does not exist: {}", connName); return; } -updateConnectorTasks(connName); +try { +updateConnectorTasks(connName); +} catch (Exception e) { +log.error("Unable to generate task configs for {}", connName, e); +} Review Comment: This is a change in behavior too, right? We no longer throw in `ConnectorContext::requestTaskReconfiguration` if we encounter any errors. This also seems reasonable (it aligns the behavior across standalone and distributed modes), but it does has consequences for the REST API, where restarting a connector no longer fails if we're unable to generate task configs for it (which is currently the case for both distributed and standalone modes). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13185: KAFKA-14670: (part 1) Wrap Connectors in IsolatedConnector objects
C0urante commented on code in PR #13185: URL: https://github.com/apache/kafka/pull/13185#discussion_r1123318824 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/IsolatedPlugin.java: ## @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime.isolation; + +import java.util.Objects; +import java.util.concurrent.Callable; +import java.util.function.BiConsumer; +import java.util.function.BiFunction; +import java.util.function.Consumer; +import java.util.function.Function; + +public abstract class IsolatedPlugin { + +private final Plugins plugins; +private final Class pluginClass; +protected final P delegate; +private final ClassLoader classLoader; +private final PluginType type; + +IsolatedPlugin(Plugins plugins, P delegate, PluginType type) { +this.plugins = Objects.requireNonNull(plugins, "plugins must be non-null"); +this.delegate = Objects.requireNonNull(delegate, "delegate plugin must be non-null"); +this.pluginClass = delegate.getClass(); +ClassLoader classLoader = pluginClass.getClassLoader(); +this.classLoader = Objects.requireNonNull(classLoader, "delegate plugin must not be a boostrap class"); +this.type = Objects.requireNonNull(type, "plugin type must be non-null"); +} + +public PluginType type() { +return type; +} + +@SuppressWarnings("unchecked") +public Class pluginClass() { +return (Class) pluginClass; +} + +protected V isolate(Callable callable) throws Exception { +try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) { +return callable.call(); +} +} + +protected void isolateV(ThrowingRunnable runnable) throws Exception { +try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) { +runnable.run(); +} +} + +protected void isolateV(Consumer consumer, T t) throws Exception { +try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) { +consumer.accept(t); +} +} + +protected void isolateV(BiConsumer consumer, T t, U u) throws Exception { +try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) { +consumer.accept(t, u); +} +} + +protected R isolate(Function function, T t) throws Exception { +try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) { +return function.apply(t); +} +} + +protected R isolate(BiFunction function, T t, U u) throws Exception { +try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) { +return function.apply(t, u); +} +} Review Comment: Ah, I misunderstood--I thought you'd found a counterexample that couldn't be covered by the minimal set of `isolate`/`isolateV` variants, instead of one that couldn't be covered by the expanded set of variants originally present in the PR. Was really scratching my head over that one! Thanks for the changes, this class LGTM now 👍 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13185: KAFKA-14670: (part 1) Wrap Connectors in IsolatedConnector objects
C0urante commented on code in PR #13185: URL: https://github.com/apache/kafka/pull/13185#discussion_r1120889645 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/IsolatedPlugin.java: ## @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime.isolation; + +import java.util.Objects; +import java.util.concurrent.Callable; +import java.util.function.BiConsumer; +import java.util.function.BiFunction; +import java.util.function.Consumer; +import java.util.function.Function; + +public abstract class IsolatedPlugin { + +private final Plugins plugins; +private final Class pluginClass; +protected final P delegate; +private final ClassLoader classLoader; +private final PluginType type; + +IsolatedPlugin(Plugins plugins, P delegate, PluginType type) { +this.plugins = Objects.requireNonNull(plugins, "plugins must be non-null"); +this.delegate = Objects.requireNonNull(delegate, "delegate plugin must be non-null"); +this.pluginClass = delegate.getClass(); +ClassLoader classLoader = pluginClass.getClassLoader(); +this.classLoader = Objects.requireNonNull(classLoader, "delegate plugin must not be a boostrap class"); +this.type = Objects.requireNonNull(type, "plugin type must be non-null"); +} + +public PluginType type() { +return type; +} + +@SuppressWarnings("unchecked") +public Class pluginClass() { +return (Class) pluginClass; +} + +protected V isolate(Callable callable) throws Exception { +try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) { +return callable.call(); +} +} + +protected void isolateV(ThrowingRunnable runnable) throws Exception { +try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) { +runnable.run(); +} +} + +protected void isolateV(Consumer consumer, T t) throws Exception { +try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) { +consumer.accept(t); +} +} + +protected void isolateV(BiConsumer consumer, T t, U u) throws Exception { +try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) { +consumer.accept(t, u); +} +} + +protected R isolate(Function function, T t) throws Exception { +try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) { +return function.apply(t); +} +} + +protected R isolate(BiFunction function, T t, U u) throws Exception { +try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) { +return function.apply(t, u); +} +} Review Comment: If it comes down to not being able to completely satisfy all use cases with just the `isolate(Callable)` (and possibly `isolateV(ThrowingRunnable)` methods, then I think it could be alright to add more variants. But the additional stack frames, and even ugly class names, don't seem likely to be super important to users or plugin developers. In both cases (though probably more often the latter), aren't they more likely to look for the first stack frame that originates with their plugin class? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13185: KAFKA-14670: (part 1) Wrap Connectors in IsolatedConnector objects
C0urante commented on code in PR #13185: URL: https://github.com/apache/kafka/pull/13185#discussion_r1120887378 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/IsolatedPlugin.java: ## @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime.isolation; + +import java.util.Objects; +import java.util.concurrent.Callable; +import java.util.function.BiConsumer; +import java.util.function.BiFunction; +import java.util.function.Consumer; +import java.util.function.Function; + +public abstract class IsolatedPlugin { + +private final Plugins plugins; +private final Class pluginClass; +protected final P delegate; +private final ClassLoader classLoader; +private final PluginType type; + +IsolatedPlugin(Plugins plugins, P delegate, PluginType type) { +this.plugins = Objects.requireNonNull(plugins, "plugins must be non-null"); +this.delegate = Objects.requireNonNull(delegate, "delegate plugin must be non-null"); +this.pluginClass = delegate.getClass(); +ClassLoader classLoader = pluginClass.getClassLoader(); +this.classLoader = Objects.requireNonNull(classLoader, "delegate plugin must not be a boostrap class"); +this.type = Objects.requireNonNull(type, "plugin type must be non-null"); +} + +public PluginType type() { +return type; +} + +@SuppressWarnings("unchecked") +public Class pluginClass() { +return (Class) pluginClass; +} + +protected V isolate(Callable callable) throws Exception { +try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) { +return callable.call(); +} +} + +protected void isolateV(ThrowingRunnable runnable) throws Exception { +try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) { +runnable.run(); +} +} + +protected void isolateV(Consumer consumer, T t) throws Exception { +try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) { +consumer.accept(t); +} +} + +protected void isolateV(BiConsumer consumer, T t, U u) throws Exception { +try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) { +consumer.accept(t, u); +} +} + +protected R isolate(Function function, T t) throws Exception { +try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) { +return function.apply(t); +} +} + +protected R isolate(BiFunction function, T t, U u) throws Exception { +try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) { +return function.apply(t, u); +} +} Review Comment: I would be really interested in knowing which case can't be covered here. If you can dig up the `Converter` counterexample, would you mind sharing it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13185: KAFKA-14670: (part 1) Wrap Connectors in IsolatedConnector objects
C0urante commented on code in PR #13185: URL: https://github.com/apache/kafka/pull/13185#discussion_r1120886025 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/IsolatedPlugin.java: ## @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime.isolation; + +import java.util.Objects; +import java.util.concurrent.Callable; +import java.util.function.BiConsumer; +import java.util.function.BiFunction; +import java.util.function.Consumer; +import java.util.function.Function; + +public abstract class IsolatedPlugin { + +private final Plugins plugins; +private final Class pluginClass; +protected final P delegate; +private final ClassLoader classLoader; +private final PluginType type; + +IsolatedPlugin(Plugins plugins, P delegate, PluginType type) { +this.plugins = Objects.requireNonNull(plugins, "plugins must be non-null"); +this.delegate = Objects.requireNonNull(delegate, "delegate plugin must be non-null"); +this.pluginClass = delegate.getClass(); +ClassLoader classLoader = pluginClass.getClassLoader(); +this.classLoader = Objects.requireNonNull(classLoader, "delegate plugin must not be a boostrap class"); +this.type = Objects.requireNonNull(type, "plugin type must be non-null"); +} + +public PluginType type() { +return type; +} + +@SuppressWarnings("unchecked") +public Class pluginClass() { +return (Class) pluginClass; +} + +protected V isolate(Callable callable) throws Exception { +try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) { +return callable.call(); +} +} + +protected void isolateV(ThrowingRunnable runnable) throws Exception { +try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) { +runnable.run(); +} +} + +protected void isolateV(Consumer consumer, T t) throws Exception { +try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) { +consumer.accept(t); +} +} + +protected void isolateV(BiConsumer consumer, T t, U u) throws Exception { +try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) { +consumer.accept(t, u); +} +} + +protected R isolate(Function function, T t) throws Exception { +try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) { +return function.apply(t); +} +} + +protected R isolate(BiFunction function, T t, U u) throws Exception { +try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) { +return function.apply(t, u); +} +} + +public interface ThrowingRunnable { +void run() throws Exception; +} + +@Override +public int hashCode() { +try { +return isolate(delegate::hashCode); +} catch (Throwable e) { +throw new RuntimeException("unable to evaluate plugin hashCode", e); +} +} + +@Override +public boolean equals(Object obj) { +if (obj == null || this.getClass() != obj.getClass()) { +return false; +} Review Comment: Given that equality across `Connector` instances is likely not a developer priority, I think you're right regarding reference quality for delegates. Good point 👍 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13185: KAFKA-14670: (part 1) Wrap Connectors in IsolatedConnector objects
C0urante commented on code in PR #13185: URL: https://github.com/apache/kafka/pull/13185#discussion_r1120685793 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/IsolatedPlugin.java: ## @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime.isolation; + +import java.util.Objects; +import java.util.concurrent.Callable; +import java.util.function.BiConsumer; +import java.util.function.BiFunction; +import java.util.function.Consumer; +import java.util.function.Function; + +public abstract class IsolatedPlugin { + +private final Plugins plugins; +private final Class pluginClass; +protected final P delegate; +private final ClassLoader classLoader; +private final PluginType type; + +IsolatedPlugin(Plugins plugins, P delegate, PluginType type) { +this.plugins = Objects.requireNonNull(plugins, "plugins must be non-null"); +this.delegate = Objects.requireNonNull(delegate, "delegate plugin must be non-null"); +this.pluginClass = delegate.getClass(); +ClassLoader classLoader = pluginClass.getClassLoader(); +this.classLoader = Objects.requireNonNull(classLoader, "delegate plugin must not be a boostrap class"); +this.type = Objects.requireNonNull(type, "plugin type must be non-null"); +} + +public PluginType type() { +return type; +} + +@SuppressWarnings("unchecked") +public Class pluginClass() { +return (Class) pluginClass; +} + +protected V isolate(Callable callable) throws Exception { +try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) { +return callable.call(); +} +} + +protected void isolateV(ThrowingRunnable runnable) throws Exception { +try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) { +runnable.run(); +} Review Comment: We can reduce duplication here by piggybacking off of the non-void `isolate` method: ```suggestion isolate(() -> { runnable.run(); return null; }); ``` ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java: ## @@ -419,8 +420,14 @@ public ConnectorMetricsGroup(ConnectMetrics connectMetrics, AbstractStatus.State metricGroup.close(); metricGroup.addImmutableValueMetric(registry.connectorType, connectorType()); -metricGroup.addImmutableValueMetric(registry.connectorClass, connector.getClass().getName()); -metricGroup.addImmutableValueMetric(registry.connectorVersion, connector.version()); +metricGroup.addImmutableValueMetric(registry.connectorClass, connector.pluginClass().getName()); +String version; +try { +version = connector.version(); +} catch (Exception e) { +version = DelegatingClassLoader.UNDEFINED_VERSION; +} Review Comment: This technically changes behavior, right? We go from failing the connector on startup to just assigning it an undefined version. I think this is probably fine (the benefits outweigh the costs, and I have a hard time imagining any use case that would favor failing here), just want to make sure I'm gauging the impact here correctly. ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/IsolatedPlugin.java: ## @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, eith