This is an automated email from the ASF dual-hosted git repository.
namelchev pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite-extensions.git
The following commit(s) were added to refs/heads/master by this push:
new a41c0cb IGNITE-17311 Fixed KafkaToIgniteLoader instantiation several
Spring contexts (#162)
a41c0cb is described below
commit a41c0cb26cec6fafff09114ef60559d683e063e2
Author: Nikita Amelchev <[email protected]>
AuthorDate: Thu Jul 7 12:25:37 2022 +0300
IGNITE-17311 Fixed KafkaToIgniteLoader instantiation several Spring
contexts (#162)
---
.../ignite/cdc/kafka/KafkaToIgniteLoader.java | 27 +++++++++++++---------
.../ignite/cdc/kafka/KafkaToIgniteLoaderTest.java | 22 ++++++++++++++++++
.../kafka-to-ignite-initiation-context-test.xml | 27 ++++++++++++++++++++++
3 files changed, 65 insertions(+), 11 deletions(-)
diff --git
a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteLoader.java
b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteLoader.java
index adfc113..ae5affb 100644
---
a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteLoader.java
+++
b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteLoader.java
@@ -19,13 +19,15 @@ package org.apache.ignite.cdc.kafka;
import java.net.URL;
import java.util.Collection;
+import java.util.Map;
import java.util.Properties;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.configuration.IgniteConfiguration;
import
org.apache.ignite.internal.processors.resource.GridSpringResourceContext;
+import org.apache.ignite.internal.util.lang.GridTuple3;
import org.apache.ignite.internal.util.spring.IgniteSpringHelper;
+import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteBiTuple;
import static org.apache.ignite.internal.IgniteComponentType.SPRING;
@@ -49,27 +51,30 @@ public class KafkaToIgniteLoader {
IgniteSpringHelper spring = SPRING.create(false);
- IgniteBiTuple<Collection<IgniteConfiguration>, ? extends
GridSpringResourceContext> cfgTuple =
- spring.loadConfigurations(cfgUrl);
+ GridTuple3<Map<String, ?>, Map<Class<?>, Collection>, ? extends
GridSpringResourceContext> cfgTuple =
+ spring.loadBeans(cfgUrl, F.asList(KAFKA_PROPERTIES),
+ IgniteConfiguration.class,
KafkaToIgniteCdcStreamerConfiguration.class);
- if (cfgTuple.get1().size() > 1) {
+ Collection<IgniteConfiguration> ignCfg =
cfgTuple.get2().get(IgniteConfiguration.class);
+
+ if (ignCfg.size() > 1) {
throw new IgniteCheckedException(
- "Exact 1 IgniteConfiguration should be defined. Found " +
cfgTuple.get1().size()
+ "Exact 1 IgniteConfiguration should be defined. Found " +
ignCfg.size()
);
}
- IgniteBiTuple<Collection<KafkaToIgniteCdcStreamerConfiguration>, ?
extends GridSpringResourceContext> k2iCfg =
- spring.loadConfigurations(cfgUrl,
KafkaToIgniteCdcStreamerConfiguration.class);
+ Collection<KafkaToIgniteCdcStreamerConfiguration> k2iCfg =
+ cfgTuple.get2().get(KafkaToIgniteCdcStreamerConfiguration.class);
- if (k2iCfg.get1().size() > 1) {
+ if (k2iCfg.size() > 1) {
throw new IgniteCheckedException(
"Exact 1 KafkaToIgniteCdcStreamerConfiguration configuration
should be defined. " +
- "Found " + k2iCfg.get1().size()
+ "Found " + k2iCfg.size()
);
}
- Properties kafkaProps = spring.loadBean(cfgUrl, KAFKA_PROPERTIES);
+ Properties kafkaProps =
(Properties)cfgTuple.get1().get(KAFKA_PROPERTIES);
- return new KafkaToIgniteCdcStreamer(cfgTuple.get1().iterator().next(),
kafkaProps, k2iCfg.get1().iterator().next());
+ return new KafkaToIgniteCdcStreamer(ignCfg.iterator().next(),
kafkaProps, k2iCfg.iterator().next());
}
}
diff --git
a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/KafkaToIgniteLoaderTest.java
b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/KafkaToIgniteLoaderTest.java
index 413d105..4fd818e 100644
---
a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/KafkaToIgniteLoaderTest.java
+++
b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/KafkaToIgniteLoaderTest.java
@@ -17,6 +17,7 @@
package org.apache.ignite.cdc.kafka;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
@@ -47,4 +48,25 @@ public class KafkaToIgniteLoaderTest extends
GridCommonAbstractTest {
assertNotNull(streamer);
}
+
+ /** */
+ @Test
+ public void testInitSpringContextOnce() throws Exception {
+ assertEquals(0, InitiationTestBean.initCnt.get());
+
+
loadKafkaToIgniteStreamer("loader/kafka-to-ignite-initiation-context-test.xml");
+
+ assertEquals(1, InitiationTestBean.initCnt.get());
+ }
+
+ /** */
+ private static class InitiationTestBean {
+ /** */
+ static AtomicInteger initCnt = new AtomicInteger();
+
+ /** */
+ InitiationTestBean() {
+ initCnt.incrementAndGet();
+ }
+ }
}
diff --git
a/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-initiation-context-test.xml
b/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-initiation-context-test.xml
new file mode 100644
index 0000000..cc1968a
--- /dev/null
+++
b/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-initiation-context-test.xml
@@ -0,0 +1,27 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="
+ http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd">
+ <import resource="kafka-to-ignite-correct.xml" />
+
+ <bean id="testBean"
class="org.apache.ignite.cdc.kafka.KafkaToIgniteLoaderTest.InitiationTestBean"/>
+</beans>