This is an automated email from the ASF dual-hosted git repository. zhouxj pushed a commit to branch feature/GEODE-6518 in repository https://gitbox.apache.org/repos/asf/geode.git
commit d148855504f0f565e2c62f1b9b584158746dd021 Author: zhouxh <gz...@pivotal.io> AuthorDate: Thu Mar 14 17:34:48 2019 -0700 GEODE-6518: for proxy region, should not add loader and writer for jdbc-mapping --- ...reateMappingCommandForProxyRegionDUnitTest.java | 434 +++++++++++++++++++++ .../jdbc/internal/cli/CreateMappingFunction.java | 16 +- 2 files changed, 448 insertions(+), 2 deletions(-) diff --git a/geode-connectors/src/distributedTest/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingCommandForProxyRegionDUnitTest.java b/geode-connectors/src/distributedTest/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingCommandForProxyRegionDUnitTest.java new file mode 100644 index 0000000..ee57839 --- /dev/null +++ b/geode-connectors/src/distributedTest/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingCommandForProxyRegionDUnitTest.java @@ -0,0 +1,434 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.connectors.jdbc.internal.cli; + +import static org.apache.geode.connectors.jdbc.internal.cli.CreateMappingCommand.CREATE_MAPPING; +import static org.apache.geode.connectors.util.internal.MappingConstants.DATA_SOURCE_NAME; +import static org.apache.geode.connectors.util.internal.MappingConstants.GROUP_NAME; +import static org.apache.geode.connectors.util.internal.MappingConstants.ID_NAME; +import static org.apache.geode.connectors.util.internal.MappingConstants.PDX_NAME; +import static org.apache.geode.connectors.util.internal.MappingConstants.REGION_NAME; +import static org.apache.geode.connectors.util.internal.MappingConstants.SCHEMA_NAME; +import static org.apache.geode.connectors.util.internal.MappingConstants.TABLE_NAME; +import static org.assertj.core.api.Assertions.assertThat; + +import java.sql.Connection; +import java.sql.JDBCType; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Arrays; +import java.util.List; + +import javax.sql.DataSource; + +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.geode.cache.Region; +import org.apache.geode.cache.asyncqueue.AsyncEventQueue; +import org.apache.geode.cache.configuration.CacheConfig; +import org.apache.geode.cache.configuration.RegionAttributesType; +import org.apache.geode.cache.configuration.RegionConfig; +import org.apache.geode.connectors.jdbc.JdbcAsyncWriter; +import org.apache.geode.connectors.jdbc.JdbcLoader; +import org.apache.geode.connectors.jdbc.JdbcWriter; +import org.apache.geode.connectors.jdbc.internal.JdbcConnectorService; +import org.apache.geode.connectors.jdbc.internal.configuration.FieldMapping; +import org.apache.geode.connectors.jdbc.internal.configuration.RegionMapping; +import org.apache.geode.connectors.util.internal.MappingCommandUtils; +import org.apache.geode.distributed.internal.InternalLocator; +import org.apache.geode.internal.cache.InternalCache; +import org.apache.geode.internal.cache.LocalRegion; +import org.apache.geode.internal.cache.PartitionedRegion; +import org.apache.geode.internal.jndi.JNDIInvoker; +import org.apache.geode.management.internal.cli.util.CommandStringBuilder; +import org.apache.geode.pdx.FieldType; +import org.apache.geode.pdx.PdxReader; +import org.apache.geode.pdx.PdxSerializable; +import org.apache.geode.pdx.PdxWriter; +import org.apache.geode.test.dunit.rules.ClusterStartupRule; +import org.apache.geode.test.dunit.rules.MemberVM; +import org.apache.geode.test.junit.categories.JDBCConnectorTest; +import org.apache.geode.test.junit.rules.GfshCommandRule; +import org.apache.geode.test.junit.rules.serializable.SerializableTemporaryFolder; +import org.apache.geode.test.junit.rules.serializable.SerializableTestName; + +@Category({JDBCConnectorTest.class}) +public class CreateMappingCommandForProxyRegionDUnitTest { + + private static final String TEST_REGION = "testRegion"; + private static final String TEST_GROUP1 = "testGroup1"; + private static final String TEST_GROUP2 = "testGroup2"; + + @Rule + public transient GfshCommandRule gfsh = new GfshCommandRule(); + + @Rule + public ClusterStartupRule startupRule = new ClusterStartupRule(); + + @Rule + public SerializableTestName testName = new SerializableTestName(); + + @Rule + public SerializableTemporaryFolder temporaryFolder = new SerializableTemporaryFolder(); + + private MemberVM locator; + private MemberVM server1; + private MemberVM server2; + + @Before + public void before() throws Exception { + locator = startupRule.startLocatorVM(0); + server1 = startupRule.startServerVM(1, TEST_GROUP1, locator.getPort()); + server2 = startupRule.startServerVM(2, TEST_GROUP2, locator.getPort()); + + gfsh.connectAndVerify(locator); + setupDatabase(); + } + + @After + public void after() throws Exception { + teardownDatabase(); + } + + private void setupDatabase() { + gfsh.executeAndAssertThat( + "create data-source --name=connection" + + " --pooled=false" + + " --url=\"jdbc:derby:memory:newDB;create=true\"") + .statusIsSuccess(); + executeSql( + "create table mySchema.myTable (myId varchar(10) primary key, name varchar(10))"); + } + + private void teardownDatabase() { + executeSql("drop table mySchema.myTable"); + } + + private void executeSql(String sql) { + for (MemberVM server : Arrays.asList(server1, server2)) { + server.invoke(() -> { + try { + DataSource ds = JNDIInvoker.getDataSource("connection"); + Connection conn = ds.getConnection(); + Statement sm = conn.createStatement(); + sm.execute(sql); + sm.close(); + conn.close(); + } catch (SQLException e) { + throw new RuntimeException(e); + } + }); + } + } + + private void setupPartition(String regionName) { + gfsh.executeAndAssertThat("create region --name=" + regionName + " --type=PARTITION") + .statusIsSuccess(); + } + + private void setupGroupPartition(String regionName, String groupNames, boolean isAccessor) { + gfsh.executeAndAssertThat( + "create region --name=" + regionName + (isAccessor ? " --type=PARTITION_PROXY" + : " --type=PARTITION") + " --groups=" + groupNames) + .statusIsSuccess(); + } + + private static RegionMapping getRegionMappingFromClusterConfig(String regionName, + String groups) { + CacheConfig cacheConfig = + InternalLocator.getLocator().getConfigurationPersistenceService().getCacheConfig(groups); + RegionConfig regionConfig = cacheConfig.getRegions().stream() + .filter(region -> region.getName().equals(convertRegionPathToName(regionName))).findFirst() + .orElse(null); + RegionMapping regionMapping = + (RegionMapping) regionConfig.getCustomRegionElements().stream() + .filter(element -> element instanceof RegionMapping).findFirst().orElse(null); + return regionMapping; + } + + private static RegionMapping getRegionMappingFromService(String regionName) { + return ClusterStartupRule.getCache().getService(JdbcConnectorService.class) + .getMappingForRegion(convertRegionPathToName(regionName)); + } + + private static void validateAsyncEventQueueCreatedInClusterConfig(String regionName, + String groups, + boolean isParallel) { + CacheConfig cacheConfig = + InternalLocator.getLocator().getConfigurationPersistenceService().getCacheConfig(groups); + List<CacheConfig.AsyncEventQueue> queueList = cacheConfig.getAsyncEventQueues(); + String queueName = MappingCommandUtils.createAsyncEventQueueName(regionName); + CacheConfig.AsyncEventQueue queue = findQueue(queueList, queueName); + assertThat(queue).isNotNull(); + assertThat(queue.getId()).isEqualTo(queueName); + assertThat(queue.getAsyncEventListener().getClassName()) + .isEqualTo(JdbcAsyncWriter.class.getName()); + assertThat(queue.isParallel()).isEqualTo(isParallel); + } + + private static CacheConfig.AsyncEventQueue findQueue( + List<CacheConfig.AsyncEventQueue> queueList, + String queueName) { + for (CacheConfig.AsyncEventQueue queue : queueList) { + if (queue.getId().equals(queueName)) { + return queue; + } + } + return null; + } + + private static String convertRegionPathToName(String regionPath) { + if (regionPath.startsWith("/")) { + return regionPath.substring(1); + } + return regionPath; + } + + private static void validateRegionAlteredInClusterConfig(String regionName, + String groups, + boolean synchronous) { + CacheConfig cacheConfig = + InternalLocator.getLocator().getConfigurationPersistenceService().getCacheConfig(groups); + RegionConfig regionConfig = cacheConfig.getRegions().stream() + .filter(region -> region.getName().equals(convertRegionPathToName(regionName))).findFirst() + .orElse(null); + RegionAttributesType attributes = regionConfig.getRegionAttributes(); + assertThat(attributes.getCacheLoader().getClassName()).isEqualTo(JdbcLoader.class.getName()); + if (synchronous) { + assertThat(attributes.getCacheWriter().getClassName()).isEqualTo(JdbcWriter.class.getName()); + } else { + String queueName = MappingCommandUtils.createAsyncEventQueueName(regionName); + assertThat(attributes.getAsyncEventQueueIds()).isEqualTo(queueName); + } + } + + private static void validateAsyncEventQueueCreatedOnServer(String regionName, + boolean isParallel) { + InternalCache cache = ClusterStartupRule.getCache(); + String queueName = MappingCommandUtils.createAsyncEventQueueName(regionName); + AsyncEventQueue queue = cache.getAsyncEventQueue(queueName); + assertThat(queue).isNotNull(); + assertThat(queue.getAsyncEventListener()).isInstanceOf(JdbcAsyncWriter.class); + assertThat(queue.isParallel()).isEqualTo(isParallel); + } + + private static void validateRegionAlteredOnServer(String regionName, boolean synchronous) { + InternalCache cache = ClusterStartupRule.getCache(); + Region<?, ?> region = cache.getRegion(regionName); + boolean isProxy = false; + if (region instanceof PartitionedRegion) { + isProxy = ((PartitionedRegion) region).getLocalMaxMemory() == 0; + } else { + isProxy = ((LocalRegion) region).isProxy(); + } + if (!isProxy) { + assertThat(region.getAttributes().getCacheLoader()).isInstanceOf(JdbcLoader.class); + } else { + assertThat(region.getAttributes().getCacheLoader()).isNull(); + } + if (synchronous) { + if (!isProxy) { + assertThat(region.getAttributes().getCacheWriter()).isInstanceOf(JdbcWriter.class); + } else { + assertThat(region.getAttributes().getCacheWriter()).isNull(); + } + } else { + String queueName = MappingCommandUtils.createAsyncEventQueueName(regionName); + assertThat(region.getAttributes().getAsyncEventQueueIds()).contains(queueName); + } + } + + @Test + public void createMappingTogetherForMultiServerGroupWithEmptyRegion() { + String regionName = "/" + TEST_REGION; + setupGroupPartition(regionName, TEST_GROUP1, false); + setupGroupPartition(regionName, TEST_GROUP2, true); + CommandStringBuilder csb = new CommandStringBuilder(CREATE_MAPPING); + csb.addOption(REGION_NAME, regionName); + csb.addOption(DATA_SOURCE_NAME, "connection"); + csb.addOption(TABLE_NAME, "myTable"); + csb.addOption(PDX_NAME, IdAndName.class.getName()); + csb.addOption(ID_NAME, "myId"); + csb.addOption(SCHEMA_NAME, "mySchema"); + csb.addOption(GROUP_NAME, TEST_GROUP1 + "," + TEST_GROUP2); + + gfsh.executeAndAssertThat(csb.toString()).statusIsSuccess(); + + for (MemberVM server : Arrays.asList(server1, server2)) { + server.invoke(() -> { + RegionMapping mapping = getRegionMappingFromService(regionName); + assertValidMappingOnServer(mapping, regionName, false, true); + }); + } + + server1.invoke(() -> { + RegionMapping mapping = getRegionMappingFromService(regionName); + assertThat(mapping).isNotNull(); + }); + server2.invoke(() -> { + RegionMapping mapping = getRegionMappingFromService(regionName); + assertThat(mapping).isNotNull(); + }); + + locator.invoke(() -> { + RegionMapping regionMapping = getRegionMappingFromClusterConfig(regionName, TEST_GROUP1); + assertValidMappingOnLocator(regionMapping, regionName, TEST_GROUP1, false, true); + regionMapping = getRegionMappingFromClusterConfig(regionName, TEST_GROUP2); + assertValidMappingOnLocator(regionMapping, regionName, TEST_GROUP2, false, true); + }); + } + + @Test + public void createMappingSeparatelyTogetherForMultiServerGroupWithEmptyRegion() { + String regionName = "/" + TEST_REGION; + setupGroupPartition(regionName, TEST_GROUP1, false); + setupGroupPartition(regionName, TEST_GROUP2, true); + CommandStringBuilder csb = new CommandStringBuilder(CREATE_MAPPING); + csb.addOption(REGION_NAME, regionName); + csb.addOption(DATA_SOURCE_NAME, "connection"); + csb.addOption(TABLE_NAME, "myTable"); + csb.addOption(PDX_NAME, IdAndName.class.getName()); + csb.addOption(ID_NAME, "myId"); + csb.addOption(SCHEMA_NAME, "mySchema"); + csb.addOption(GROUP_NAME, TEST_GROUP1); + + gfsh.executeAndAssertThat(csb.toString()).statusIsSuccess(); + + csb = new CommandStringBuilder(CREATE_MAPPING); + csb.addOption(REGION_NAME, regionName); + csb.addOption(DATA_SOURCE_NAME, "connection"); + csb.addOption(TABLE_NAME, "myTable"); + csb.addOption(PDX_NAME, IdAndName.class.getName()); + csb.addOption(ID_NAME, "myId"); + csb.addOption(SCHEMA_NAME, "mySchema"); + csb.addOption(GROUP_NAME, TEST_GROUP2); + + gfsh.executeAndAssertThat(csb.toString()).statusIsSuccess(); + + for (MemberVM server : Arrays.asList(server1, server2)) { + server.invoke(() -> { + RegionMapping mapping = getRegionMappingFromService(regionName); + assertValidMappingOnServer(mapping, regionName, false, true); + }); + } + + server1.invoke(() -> { + RegionMapping mapping = getRegionMappingFromService(regionName); + assertThat(mapping).isNotNull(); + }); + server2.invoke(() -> { + RegionMapping mapping = getRegionMappingFromService(regionName); + assertThat(mapping).isNotNull(); + }); + + locator.invoke(() -> { + RegionMapping regionMapping = getRegionMappingFromClusterConfig(regionName, TEST_GROUP1); + assertValidMappingOnLocator(regionMapping, regionName, TEST_GROUP1, false, true); + regionMapping = getRegionMappingFromClusterConfig(regionName, TEST_GROUP2); + assertValidMappingOnLocator(regionMapping, regionName, TEST_GROUP2, false, true); + }); + } + + private static void assertValidMappingOnServer(RegionMapping mapping, String regionName, + boolean synchronous, boolean isParallel) { + assertValidMapping(mapping); + validateRegionAlteredOnServer(regionName, synchronous); + if (!synchronous) { + validateAsyncEventQueueCreatedOnServer(regionName, isParallel); + } + } + + private static void assertValidMappingOnLocator(RegionMapping mapping, String regionName, + String groups, + boolean synchronous, boolean isParallel) { + assertValidMapping(mapping); + validateRegionAlteredInClusterConfig(regionName, groups, synchronous); + if (!synchronous) { + validateAsyncEventQueueCreatedInClusterConfig(regionName, groups, isParallel); + } + } + + private static void assertValidMapping(RegionMapping mapping) { + assertThat(mapping.getDataSourceName()).isEqualTo("connection"); + assertThat(mapping.getTableName()).isEqualTo("myTable"); + assertThat(mapping.getPdxName()).isEqualTo(IdAndName.class.getName()); + assertThat(mapping.getIds()).isEqualTo("myId"); + assertThat(mapping.getCatalog()).isNull(); + assertThat(mapping.getSchema()).isEqualTo("mySchema"); + List<FieldMapping> fieldMappings = mapping.getFieldMappings(); + assertThat(fieldMappings.size()).isEqualTo(2); + assertThat(fieldMappings.get(0)).isEqualTo( + new FieldMapping("myid", FieldType.STRING.name(), "MYID", JDBCType.VARCHAR.name(), false)); + assertThat(fieldMappings.get(1)).isEqualTo( + new FieldMapping("name", FieldType.STRING.name(), "NAME", JDBCType.VARCHAR.name(), true)); + } + + + private static void assertValidResourcePDXMapping(RegionMapping mapping, String tableName) { + assertThat(mapping.getDataSourceName()).isEqualTo("connection"); + assertThat(mapping.getTableName()).isEqualTo(tableName); + assertThat(mapping.getPdxName()).isEqualTo("org.apache.geode.internal.ResourcePDX"); + assertThat(mapping.getIds()).isEqualTo("id"); + assertThat(mapping.getCatalog()).isNull(); + assertThat(mapping.getSchema()).isEqualTo("mySchema"); + List<FieldMapping> fieldMappings = mapping.getFieldMappings(); + assertThat(fieldMappings).hasSize(3); + assertThat(fieldMappings.get(0)) + .isEqualTo(new FieldMapping("id", "STRING", "ID", "VARCHAR", false)); + assertThat(fieldMappings.get(1)) + .isEqualTo(new FieldMapping("name", "STRING", "NAME", "VARCHAR", true)); + assertThat(fieldMappings.get(2)) + .isEqualTo(new FieldMapping("age", "INT", "AGE", "INTEGER", true)); + } + + public static class IdAndName implements PdxSerializable { + private String id; + private String name; + + public IdAndName() { + // nothing + } + + IdAndName(String id, String name) { + this.id = id; + this.name = name; + } + + String getId() { + return id; + } + + String getName() { + return name; + } + + @Override + public void toData(PdxWriter writer) { + writer.writeString("myid", this.id); + writer.writeString("name", this.name); + } + + @Override + public void fromData(PdxReader reader) { + this.id = reader.readString("myid"); + this.name = reader.readString("name"); + } + } + +} diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingFunction.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingFunction.java index 6536a4e..c1bf2a8 100644 --- a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingFunction.java +++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingFunction.java @@ -26,6 +26,8 @@ import org.apache.geode.connectors.jdbc.internal.JdbcConnectorService; import org.apache.geode.connectors.jdbc.internal.RegionMappingExistsException; import org.apache.geode.connectors.jdbc.internal.configuration.RegionMapping; import org.apache.geode.connectors.util.internal.MappingCommandUtils; +import org.apache.geode.internal.cache.LocalRegion; +import org.apache.geode.internal.cache.PartitionedRegion; import org.apache.geode.management.cli.CliFunction; import org.apache.geode.management.internal.cli.functions.CliFunctionResult; @@ -75,9 +77,19 @@ public class CreateMappingFunction extends CliFunction<Object[]> { * and the given async-event-queue as one of its queues. */ private void alterRegion(Region<?, ?> region, String queueName, boolean synchronous) { - region.getAttributesMutator().setCacheLoader(new JdbcLoader()); + boolean isProxy = false; + if (region instanceof PartitionedRegion) { + isProxy = ((PartitionedRegion) region).getLocalMaxMemory() == 0; + } else { + isProxy = ((LocalRegion) region).isProxy(); + } + if (!isProxy) { + region.getAttributesMutator().setCacheLoader(new JdbcLoader()); + } if (synchronous) { - region.getAttributesMutator().setCacheWriter(new JdbcWriter()); + if (!isProxy) { + region.getAttributesMutator().setCacheWriter(new JdbcWriter()); + } } else { region.getAttributesMutator().addAsyncEventQueueId(queueName); }