This is an automated email from the ASF dual-hosted git repository.

namelchev pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite-extensions.git


The following commit(s) were added to refs/heads/master by this push:
     new a41c0cb  IGNITE-17311 Fixed KafkaToIgniteLoader instantiation several 
Spring contexts (#162)
a41c0cb is described below

commit a41c0cb26cec6fafff09114ef60559d683e063e2
Author: Nikita Amelchev <[email protected]>
AuthorDate: Thu Jul 7 12:25:37 2022 +0300

    IGNITE-17311 Fixed KafkaToIgniteLoader instantiation several Spring 
contexts (#162)
---
 .../ignite/cdc/kafka/KafkaToIgniteLoader.java      | 27 +++++++++++++---------
 .../ignite/cdc/kafka/KafkaToIgniteLoaderTest.java  | 22 ++++++++++++++++++
 .../kafka-to-ignite-initiation-context-test.xml    | 27 ++++++++++++++++++++++
 3 files changed, 65 insertions(+), 11 deletions(-)

diff --git 
a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteLoader.java
 
b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteLoader.java
index adfc113..ae5affb 100644
--- 
a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteLoader.java
+++ 
b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteLoader.java
@@ -19,13 +19,15 @@ package org.apache.ignite.cdc.kafka;
 
 import java.net.URL;
 import java.util.Collection;
+import java.util.Map;
 import java.util.Properties;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import 
org.apache.ignite.internal.processors.resource.GridSpringResourceContext;
+import org.apache.ignite.internal.util.lang.GridTuple3;
 import org.apache.ignite.internal.util.spring.IgniteSpringHelper;
+import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteBiTuple;
 
 import static org.apache.ignite.internal.IgniteComponentType.SPRING;
 
@@ -49,27 +51,30 @@ public class KafkaToIgniteLoader {
 
         IgniteSpringHelper spring = SPRING.create(false);
 
-        IgniteBiTuple<Collection<IgniteConfiguration>, ? extends 
GridSpringResourceContext> cfgTuple =
-            spring.loadConfigurations(cfgUrl);
+        GridTuple3<Map<String, ?>, Map<Class<?>, Collection>, ? extends 
GridSpringResourceContext> cfgTuple =
+            spring.loadBeans(cfgUrl, F.asList(KAFKA_PROPERTIES),
+                IgniteConfiguration.class, 
KafkaToIgniteCdcStreamerConfiguration.class);
 
-        if (cfgTuple.get1().size() > 1) {
+        Collection<IgniteConfiguration> ignCfg = 
cfgTuple.get2().get(IgniteConfiguration.class);
+
+        if (ignCfg.size() > 1) {
             throw new IgniteCheckedException(
-                "Exact 1 IgniteConfiguration should be defined. Found " + 
cfgTuple.get1().size()
+                "Exact 1 IgniteConfiguration should be defined. Found " + 
ignCfg.size()
             );
         }
 
-        IgniteBiTuple<Collection<KafkaToIgniteCdcStreamerConfiguration>, ? 
extends GridSpringResourceContext> k2iCfg =
-            spring.loadConfigurations(cfgUrl, 
KafkaToIgniteCdcStreamerConfiguration.class);
+        Collection<KafkaToIgniteCdcStreamerConfiguration> k2iCfg =
+            cfgTuple.get2().get(KafkaToIgniteCdcStreamerConfiguration.class);
 
-        if (k2iCfg.get1().size() > 1) {
+        if (k2iCfg.size() > 1) {
             throw new IgniteCheckedException(
                 "Exact 1 KafkaToIgniteCdcStreamerConfiguration configuration 
should be defined. " +
-                    "Found " + k2iCfg.get1().size()
+                    "Found " + k2iCfg.size()
             );
         }
 
-        Properties kafkaProps = spring.loadBean(cfgUrl, KAFKA_PROPERTIES);
+        Properties kafkaProps = 
(Properties)cfgTuple.get1().get(KAFKA_PROPERTIES);
 
-        return new KafkaToIgniteCdcStreamer(cfgTuple.get1().iterator().next(), 
kafkaProps, k2iCfg.get1().iterator().next());
+        return new KafkaToIgniteCdcStreamer(ignCfg.iterator().next(), 
kafkaProps, k2iCfg.iterator().next());
     }
 }
diff --git 
a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/KafkaToIgniteLoaderTest.java
 
b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/KafkaToIgniteLoaderTest.java
index 413d105..4fd818e 100644
--- 
a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/KafkaToIgniteLoaderTest.java
+++ 
b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/KafkaToIgniteLoaderTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.cdc.kafka;
 
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.junit.Test;
@@ -47,4 +48,25 @@ public class KafkaToIgniteLoaderTest extends 
GridCommonAbstractTest {
 
         assertNotNull(streamer);
     }
+
+    /** */
+    @Test
+    public void testInitSpringContextOnce() throws Exception {
+        assertEquals(0, InitiationTestBean.initCnt.get());
+
+        
loadKafkaToIgniteStreamer("loader/kafka-to-ignite-initiation-context-test.xml");
+
+        assertEquals(1, InitiationTestBean.initCnt.get());
+    }
+
+    /** */
+    private static class InitiationTestBean {
+        /** */
+        static AtomicInteger initCnt = new AtomicInteger();
+
+        /** */
+        InitiationTestBean() {
+            initCnt.incrementAndGet();
+        }
+    }
 }
diff --git 
a/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-initiation-context-test.xml
 
b/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-initiation-context-test.xml
new file mode 100644
index 0000000..cc1968a
--- /dev/null
+++ 
b/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-initiation-context-test.xml
@@ -0,0 +1,27 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<beans xmlns="http://www.springframework.org/schema/beans";
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+       xsi:schemaLocation="
+        http://www.springframework.org/schema/beans 
http://www.springframework.org/schema/beans/spring-beans.xsd";>
+    <import resource="kafka-to-ignite-correct.xml" />
+
+    <bean id="testBean" 
class="org.apache.ignite.cdc.kafka.KafkaToIgniteLoaderTest.InitiationTestBean"/>
+</beans>

Reply via email to