This is an automated email from the ASF dual-hosted git repository. karp pushed a commit to branch snapshot-1.0.4 in repository https://gitbox.apache.org/repos/asf/rocketmq-streams.git
commit 93d7f7753a5c03a25c93388d4d3cb34e2eaa79a9 Author: 维章 <[email protected]> AuthorDate: Mon May 23 10:45:02 2022 +0800 ConfigurableComponent insert and query --- .../client/transform/window/WindowInfo.java | 3 +- .../rocketmq/streams/client/DBDriverTest.java | 27 ++----- .../rocketmq/streams/client/ORMUtilTest.java | 84 -------------------- .../org/apache/rocketmq/streams/client/Person.java | 89 ++++++++++++++++++++++ .../rocketmq/streams/common/utils/ReflectUtil.java | 2 + 5 files changed, 97 insertions(+), 108 deletions(-) diff --git a/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/transform/window/WindowInfo.java b/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/transform/window/WindowInfo.java index e43e0943..73b543b3 100644 --- a/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/transform/window/WindowInfo.java +++ b/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/transform/window/WindowInfo.java @@ -27,8 +27,7 @@ public class WindowInfo { public static int HOPPING_WINDOW = 1;//滑动窗口 public static int TUMBLING_WINDOW = 2;//滚动窗口 public static int SESSION_WINDOW = 3; - public static int OVER_WINDOW = 4; - public static int SHUFFLE_OVER_WINDOW = 5; + protected int type;//window类型 hopping,Tumbling protected Time windowSize;//窗口大小 protected Time windowSlide;//滑动大小 diff --git a/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/DBDriverTest.java b/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/DBDriverTest.java index 0894c873..8d6ada4c 100644 --- a/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/DBDriverTest.java +++ b/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/DBDriverTest.java @@ -17,42 +17,25 @@ package org.apache.rocketmq.streams.client; -import org.apache.rocketmq.streams.common.component.ComponentCreator; -import org.apache.rocketmq.streams.common.configure.ConfigureFileKey; import org.apache.rocketmq.streams.configurable.ConfigurableComponent; -import org.apache.rocketmq.streams.configurable.model.Configure; -import org.apache.rocketmq.streams.db.driver.DriverBuilder; import org.junit.Test; import static junit.framework.TestCase.assertNotNull; -/** - * 数据库的存储,需要配置存储的连接参数,请先完成配置,后执行单元用例 如果未建表,可以通过Configure.createTableSQL() 获取建表语句,创建表后,测试 - */ + public class DBDriverTest { - private String URL = ""; - protected String USER_NAME = ""; - protected String PASSWORD = ""; - protected String TABLE_NAME = "rocketmq_streams_configure_source"; @Test public void testDBConfigurableService() { String namespace = "streams.db.configurable"; - //正式使用时,在配置文件配置 - ComponentCreator.getProperties().put(ConfigureFileKey.CONNECT_TYPE, "DB"); - ComponentCreator.getProperties().put(ConfigureFileKey.JDBC_URL, URL);//数据库连接url - ComponentCreator.getProperties().put(ConfigureFileKey.JDBC_USERNAME, USER_NAME);//用户名 - ComponentCreator.getProperties().put(ConfigureFileKey.JDBC_PASSWORD, PASSWORD);//password - ComponentCreator.getProperties().put(ConfigureFileKey.JDBC_TABLE_NAME, TABLE_NAME); - - //如果表不存在,创建表 - String sql = (Configure.createTableSQL(TABLE_NAME)); - DriverBuilder.createDriver().execute(sql); - ConfigurableComponent configurableComponent = ConfigurableComponent.getInstance(namespace); + ConfigurableComponent configurableComponent = ConfigurableComponent.getInstance("2211"); configurableComponent.insert(createPerson(namespace)); configurableComponent.refreshConfigurable(namespace); Person person = configurableComponent.queryConfigurable("person", "peronName"); + System.out.println(person.getName()); + System.out.println(person.getAge()); + assertNotNull(person); } diff --git a/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/ORMUtilTest.java b/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/ORMUtilTest.java index 9b649d5f..ab2961a0 100644 --- a/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/ORMUtilTest.java +++ b/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/ORMUtilTest.java @@ -86,87 +86,3 @@ public class ORMUtilTest { } -class Person extends BasedConfigurable { - @ENVDependence - private String name; - private int age; - private Boolean isMale; - private List<String> addresses; - private Map<String, Integer> childName2Age; - - public static Person createPerson(String namespace) { - Person person = new Person(); - person.setNameSpace(namespace); - person.setType("person"); - person.setConfigureName("Chris"); - person.setName("Chris"); - List<String> addresses = new ArrayList<>(); - addresses.add("huilongguan"); - addresses.add("shangdi"); - person.setAddresses(addresses); - Map<String, Integer> childName2Age = new HashMap<>(); - childName2Age.put("yuanyahan", 8); - childName2Age.put("yuanruxi", 4); - person.setChildName2Age(childName2Age); - person.setMale(true); - person.setAge(18); - return person; - } - - @Override - public String toString() { - return "org.apache.rocketmq.streams.Person{" + "name='" + name + '\'' + ", age=" + age + ", isMale=" + isMale + ", addresses=" + addresses - + ", childName2Age=" + childName2Age + '}'; - } - - public String getName() { - return name; - } - - public void setName(String name) { - this.name = name; - } - - public int getAge() { - return age; - } - - public void setAge(int age) { - this.age = age; - } - - public Boolean getMale() { - return isMale; - } - - public void setMale(Boolean male) { - isMale = male; - } - - public List<String> getAddresses() { - return addresses; - } - - public void setAddresses(List<String> addresses) { - this.addresses = addresses; - } - - public Map<String, Integer> getChildName2Age() { - return childName2Age; - } - - public void setChildName2Age(Map<String, Integer> childName2Age) { - this.childName2Age = childName2Age; - } - - @Override - public Object clone() { - Person person = null; - try { - person = (Person)super.clone(); - } catch (CloneNotSupportedException e) { - System.out.println("clone error " + e); - } - return person; - } -} diff --git a/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/Person.java b/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/Person.java new file mode 100644 index 00000000..780d7d9d --- /dev/null +++ b/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/Person.java @@ -0,0 +1,89 @@ +package org.apache.rocketmq.streams.client; +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.rocketmq.streams.common.configurable.BasedConfigurable; +import org.apache.rocketmq.streams.common.configurable.annotation.ENVDependence; + +import java.util.List; +import java.util.Map; + +public class Person extends BasedConfigurable { + @ENVDependence + private String name; + private int age; + private Boolean isMale; + private List<String> addresses; + private Map<String, Integer> childName2Age; + + @Override + public String toString() { + return "org.apache.rocketmq.streams.Person{" + "name='" + name + '\'' + ", age=" + age + ", isMale=" + isMale + ", addresses=" + addresses + + ", childName2Age=" + childName2Age + '}'; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public int getAge() { + return age; + } + + public void setAge(int age) { + this.age = age; + } + + public Boolean getMale() { + return isMale; + } + + public void setMale(Boolean male) { + isMale = male; + } + + public List<String> getAddresses() { + return addresses; + } + + public void setAddresses(List<String> addresses) { + this.addresses = addresses; + } + + public Map<String, Integer> getChildName2Age() { + return childName2Age; + } + + public void setChildName2Age(Map<String, Integer> childName2Age) { + this.childName2Age = childName2Age; + } + + @Override + public Object clone() { + Person person = null; + try { + person = (Person) super.clone(); + } catch (CloneNotSupportedException e) { + System.out.println("clone error " + e); + } + return person; + } +} diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/utils/ReflectUtil.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/utils/ReflectUtil.java index 77f1612c..de5a356f 100644 --- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/utils/ReflectUtil.java +++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/utils/ReflectUtil.java @@ -679,6 +679,7 @@ public class ReflectUtil { DataType dataType = DataTypeUtil.getDataTypeFromClass(fieldValue.getClass()); Object convertFieldValue = dataType.convert(fieldValue); if (method != null) { + method.setAccessible(true); method.invoke(object, convertFieldValue); } else { Field field = object.getClass().getDeclaredField(modelFieldName); @@ -753,6 +754,7 @@ public class ReflectUtil { if (method == null) { throw new RuntimeException("can not get " + fieldName + "'s value, the method is not exist"); } + method.setAccessible(true); return (T) method.invoke(bean); } catch (Exception e) { throw new RuntimeException("can not get " + fieldName + "'s value", e);
