This is an automated email from the ASF dual-hosted git repository.

zhouxj pushed a commit to branch feature/GEODE-6518
in repository https://gitbox.apache.org/repos/asf/geode.git

commit d148855504f0f565e2c62f1b9b584158746dd021
Author: zhouxh <gz...@pivotal.io>
AuthorDate: Thu Mar 14 17:34:48 2019 -0700

    GEODE-6518: for proxy region, should not add loader and writer for 
jdbc-mapping
---
 ...reateMappingCommandForProxyRegionDUnitTest.java | 434 +++++++++++++++++++++
 .../jdbc/internal/cli/CreateMappingFunction.java   |  16 +-
 2 files changed, 448 insertions(+), 2 deletions(-)

diff --git 
a/geode-connectors/src/distributedTest/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingCommandForProxyRegionDUnitTest.java
 
b/geode-connectors/src/distributedTest/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingCommandForProxyRegionDUnitTest.java
new file mode 100644
index 0000000..ee57839
--- /dev/null
+++ 
b/geode-connectors/src/distributedTest/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingCommandForProxyRegionDUnitTest.java
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.connectors.jdbc.internal.cli;
+
+import static 
org.apache.geode.connectors.jdbc.internal.cli.CreateMappingCommand.CREATE_MAPPING;
+import static 
org.apache.geode.connectors.util.internal.MappingConstants.DATA_SOURCE_NAME;
+import static 
org.apache.geode.connectors.util.internal.MappingConstants.GROUP_NAME;
+import static 
org.apache.geode.connectors.util.internal.MappingConstants.ID_NAME;
+import static 
org.apache.geode.connectors.util.internal.MappingConstants.PDX_NAME;
+import static 
org.apache.geode.connectors.util.internal.MappingConstants.REGION_NAME;
+import static 
org.apache.geode.connectors.util.internal.MappingConstants.SCHEMA_NAME;
+import static 
org.apache.geode.connectors.util.internal.MappingConstants.TABLE_NAME;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.sql.Connection;
+import java.sql.JDBCType;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Arrays;
+import java.util.List;
+
+import javax.sql.DataSource;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.asyncqueue.AsyncEventQueue;
+import org.apache.geode.cache.configuration.CacheConfig;
+import org.apache.geode.cache.configuration.RegionAttributesType;
+import org.apache.geode.cache.configuration.RegionConfig;
+import org.apache.geode.connectors.jdbc.JdbcAsyncWriter;
+import org.apache.geode.connectors.jdbc.JdbcLoader;
+import org.apache.geode.connectors.jdbc.JdbcWriter;
+import org.apache.geode.connectors.jdbc.internal.JdbcConnectorService;
+import org.apache.geode.connectors.jdbc.internal.configuration.FieldMapping;
+import org.apache.geode.connectors.jdbc.internal.configuration.RegionMapping;
+import org.apache.geode.connectors.util.internal.MappingCommandUtils;
+import org.apache.geode.distributed.internal.InternalLocator;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.LocalRegion;
+import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.internal.jndi.JNDIInvoker;
+import org.apache.geode.management.internal.cli.util.CommandStringBuilder;
+import org.apache.geode.pdx.FieldType;
+import org.apache.geode.pdx.PdxReader;
+import org.apache.geode.pdx.PdxSerializable;
+import org.apache.geode.pdx.PdxWriter;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.junit.categories.JDBCConnectorTest;
+import org.apache.geode.test.junit.rules.GfshCommandRule;
+import 
org.apache.geode.test.junit.rules.serializable.SerializableTemporaryFolder;
+import org.apache.geode.test.junit.rules.serializable.SerializableTestName;
+
+@Category({JDBCConnectorTest.class})
+public class CreateMappingCommandForProxyRegionDUnitTest {
+
+  private static final String TEST_REGION = "testRegion";
+  private static final String TEST_GROUP1 = "testGroup1";
+  private static final String TEST_GROUP2 = "testGroup2";
+
+  @Rule
+  public transient GfshCommandRule gfsh = new GfshCommandRule();
+
+  @Rule
+  public ClusterStartupRule startupRule = new ClusterStartupRule();
+
+  @Rule
+  public SerializableTestName testName = new SerializableTestName();
+
+  @Rule
+  public SerializableTemporaryFolder temporaryFolder = new 
SerializableTemporaryFolder();
+
+  private MemberVM locator;
+  private MemberVM server1;
+  private MemberVM server2;
+
+  @Before
+  public void before() throws Exception {
+    locator = startupRule.startLocatorVM(0);
+    server1 = startupRule.startServerVM(1, TEST_GROUP1, locator.getPort());
+    server2 = startupRule.startServerVM(2, TEST_GROUP2, locator.getPort());
+
+    gfsh.connectAndVerify(locator);
+    setupDatabase();
+  }
+
+  @After
+  public void after() throws Exception {
+    teardownDatabase();
+  }
+
+  private void setupDatabase() {
+    gfsh.executeAndAssertThat(
+        "create data-source --name=connection"
+            + " --pooled=false"
+            + " --url=\"jdbc:derby:memory:newDB;create=true\"")
+        .statusIsSuccess();
+    executeSql(
+        "create table mySchema.myTable (myId varchar(10) primary key, name 
varchar(10))");
+  }
+
+  private void teardownDatabase() {
+    executeSql("drop table mySchema.myTable");
+  }
+
+  private void executeSql(String sql) {
+    for (MemberVM server : Arrays.asList(server1, server2)) {
+      server.invoke(() -> {
+        try {
+          DataSource ds = JNDIInvoker.getDataSource("connection");
+          Connection conn = ds.getConnection();
+          Statement sm = conn.createStatement();
+          sm.execute(sql);
+          sm.close();
+          conn.close();
+        } catch (SQLException e) {
+          throw new RuntimeException(e);
+        }
+      });
+    }
+  }
+
+  private void setupPartition(String regionName) {
+    gfsh.executeAndAssertThat("create region --name=" + regionName + " 
--type=PARTITION")
+        .statusIsSuccess();
+  }
+
+  private void setupGroupPartition(String regionName, String groupNames, 
boolean isAccessor) {
+    gfsh.executeAndAssertThat(
+        "create region --name=" + regionName + (isAccessor ? " 
--type=PARTITION_PROXY"
+            : " --type=PARTITION") + " --groups=" + groupNames)
+        .statusIsSuccess();
+  }
+
+  private static RegionMapping getRegionMappingFromClusterConfig(String 
regionName,
+      String groups) {
+    CacheConfig cacheConfig =
+        
InternalLocator.getLocator().getConfigurationPersistenceService().getCacheConfig(groups);
+    RegionConfig regionConfig = cacheConfig.getRegions().stream()
+        .filter(region -> 
region.getName().equals(convertRegionPathToName(regionName))).findFirst()
+        .orElse(null);
+    RegionMapping regionMapping =
+        (RegionMapping) regionConfig.getCustomRegionElements().stream()
+            .filter(element -> element instanceof 
RegionMapping).findFirst().orElse(null);
+    return regionMapping;
+  }
+
+  private static RegionMapping getRegionMappingFromService(String regionName) {
+    return ClusterStartupRule.getCache().getService(JdbcConnectorService.class)
+        .getMappingForRegion(convertRegionPathToName(regionName));
+  }
+
+  private static void validateAsyncEventQueueCreatedInClusterConfig(String 
regionName,
+      String groups,
+      boolean isParallel) {
+    CacheConfig cacheConfig =
+        
InternalLocator.getLocator().getConfigurationPersistenceService().getCacheConfig(groups);
+    List<CacheConfig.AsyncEventQueue> queueList = 
cacheConfig.getAsyncEventQueues();
+    String queueName = 
MappingCommandUtils.createAsyncEventQueueName(regionName);
+    CacheConfig.AsyncEventQueue queue = findQueue(queueList, queueName);
+    assertThat(queue).isNotNull();
+    assertThat(queue.getId()).isEqualTo(queueName);
+    assertThat(queue.getAsyncEventListener().getClassName())
+        .isEqualTo(JdbcAsyncWriter.class.getName());
+    assertThat(queue.isParallel()).isEqualTo(isParallel);
+  }
+
+  private static CacheConfig.AsyncEventQueue findQueue(
+      List<CacheConfig.AsyncEventQueue> queueList,
+      String queueName) {
+    for (CacheConfig.AsyncEventQueue queue : queueList) {
+      if (queue.getId().equals(queueName)) {
+        return queue;
+      }
+    }
+    return null;
+  }
+
+  private static String convertRegionPathToName(String regionPath) {
+    if (regionPath.startsWith("/")) {
+      return regionPath.substring(1);
+    }
+    return regionPath;
+  }
+
+  private static void validateRegionAlteredInClusterConfig(String regionName,
+      String groups,
+      boolean synchronous) {
+    CacheConfig cacheConfig =
+        
InternalLocator.getLocator().getConfigurationPersistenceService().getCacheConfig(groups);
+    RegionConfig regionConfig = cacheConfig.getRegions().stream()
+        .filter(region -> 
region.getName().equals(convertRegionPathToName(regionName))).findFirst()
+        .orElse(null);
+    RegionAttributesType attributes = regionConfig.getRegionAttributes();
+    
assertThat(attributes.getCacheLoader().getClassName()).isEqualTo(JdbcLoader.class.getName());
+    if (synchronous) {
+      
assertThat(attributes.getCacheWriter().getClassName()).isEqualTo(JdbcWriter.class.getName());
+    } else {
+      String queueName = 
MappingCommandUtils.createAsyncEventQueueName(regionName);
+      assertThat(attributes.getAsyncEventQueueIds()).isEqualTo(queueName);
+    }
+  }
+
+  private static void validateAsyncEventQueueCreatedOnServer(String regionName,
+      boolean isParallel) {
+    InternalCache cache = ClusterStartupRule.getCache();
+    String queueName = 
MappingCommandUtils.createAsyncEventQueueName(regionName);
+    AsyncEventQueue queue = cache.getAsyncEventQueue(queueName);
+    assertThat(queue).isNotNull();
+    
assertThat(queue.getAsyncEventListener()).isInstanceOf(JdbcAsyncWriter.class);
+    assertThat(queue.isParallel()).isEqualTo(isParallel);
+  }
+
+  private static void validateRegionAlteredOnServer(String regionName, boolean 
synchronous) {
+    InternalCache cache = ClusterStartupRule.getCache();
+    Region<?, ?> region = cache.getRegion(regionName);
+    boolean isProxy = false;
+    if (region instanceof PartitionedRegion) {
+      isProxy = ((PartitionedRegion) region).getLocalMaxMemory() == 0;
+    } else {
+      isProxy = ((LocalRegion) region).isProxy();
+    }
+    if (!isProxy) {
+      
assertThat(region.getAttributes().getCacheLoader()).isInstanceOf(JdbcLoader.class);
+    } else {
+      assertThat(region.getAttributes().getCacheLoader()).isNull();
+    }
+    if (synchronous) {
+      if (!isProxy) {
+        
assertThat(region.getAttributes().getCacheWriter()).isInstanceOf(JdbcWriter.class);
+      } else {
+        assertThat(region.getAttributes().getCacheWriter()).isNull();
+      }
+    } else {
+      String queueName = 
MappingCommandUtils.createAsyncEventQueueName(regionName);
+      
assertThat(region.getAttributes().getAsyncEventQueueIds()).contains(queueName);
+    }
+  }
+
+  @Test
+  public void createMappingTogetherForMultiServerGroupWithEmptyRegion() {
+    String regionName = "/" + TEST_REGION;
+    setupGroupPartition(regionName, TEST_GROUP1, false);
+    setupGroupPartition(regionName, TEST_GROUP2, true);
+    CommandStringBuilder csb = new CommandStringBuilder(CREATE_MAPPING);
+    csb.addOption(REGION_NAME, regionName);
+    csb.addOption(DATA_SOURCE_NAME, "connection");
+    csb.addOption(TABLE_NAME, "myTable");
+    csb.addOption(PDX_NAME, IdAndName.class.getName());
+    csb.addOption(ID_NAME, "myId");
+    csb.addOption(SCHEMA_NAME, "mySchema");
+    csb.addOption(GROUP_NAME, TEST_GROUP1 + "," + TEST_GROUP2);
+
+    gfsh.executeAndAssertThat(csb.toString()).statusIsSuccess();
+
+    for (MemberVM server : Arrays.asList(server1, server2)) {
+      server.invoke(() -> {
+        RegionMapping mapping = getRegionMappingFromService(regionName);
+        assertValidMappingOnServer(mapping, regionName, false, true);
+      });
+    }
+
+    server1.invoke(() -> {
+      RegionMapping mapping = getRegionMappingFromService(regionName);
+      assertThat(mapping).isNotNull();
+    });
+    server2.invoke(() -> {
+      RegionMapping mapping = getRegionMappingFromService(regionName);
+      assertThat(mapping).isNotNull();
+    });
+
+    locator.invoke(() -> {
+      RegionMapping regionMapping = 
getRegionMappingFromClusterConfig(regionName, TEST_GROUP1);
+      assertValidMappingOnLocator(regionMapping, regionName, TEST_GROUP1, 
false, true);
+      regionMapping = getRegionMappingFromClusterConfig(regionName, 
TEST_GROUP2);
+      assertValidMappingOnLocator(regionMapping, regionName, TEST_GROUP2, 
false, true);
+    });
+  }
+
+  @Test
+  public void 
createMappingSeparatelyTogetherForMultiServerGroupWithEmptyRegion() {
+    String regionName = "/" + TEST_REGION;
+    setupGroupPartition(regionName, TEST_GROUP1, false);
+    setupGroupPartition(regionName, TEST_GROUP2, true);
+    CommandStringBuilder csb = new CommandStringBuilder(CREATE_MAPPING);
+    csb.addOption(REGION_NAME, regionName);
+    csb.addOption(DATA_SOURCE_NAME, "connection");
+    csb.addOption(TABLE_NAME, "myTable");
+    csb.addOption(PDX_NAME, IdAndName.class.getName());
+    csb.addOption(ID_NAME, "myId");
+    csb.addOption(SCHEMA_NAME, "mySchema");
+    csb.addOption(GROUP_NAME, TEST_GROUP1);
+
+    gfsh.executeAndAssertThat(csb.toString()).statusIsSuccess();
+
+    csb = new CommandStringBuilder(CREATE_MAPPING);
+    csb.addOption(REGION_NAME, regionName);
+    csb.addOption(DATA_SOURCE_NAME, "connection");
+    csb.addOption(TABLE_NAME, "myTable");
+    csb.addOption(PDX_NAME, IdAndName.class.getName());
+    csb.addOption(ID_NAME, "myId");
+    csb.addOption(SCHEMA_NAME, "mySchema");
+    csb.addOption(GROUP_NAME, TEST_GROUP2);
+
+    gfsh.executeAndAssertThat(csb.toString()).statusIsSuccess();
+
+    for (MemberVM server : Arrays.asList(server1, server2)) {
+      server.invoke(() -> {
+        RegionMapping mapping = getRegionMappingFromService(regionName);
+        assertValidMappingOnServer(mapping, regionName, false, true);
+      });
+    }
+
+    server1.invoke(() -> {
+      RegionMapping mapping = getRegionMappingFromService(regionName);
+      assertThat(mapping).isNotNull();
+    });
+    server2.invoke(() -> {
+      RegionMapping mapping = getRegionMappingFromService(regionName);
+      assertThat(mapping).isNotNull();
+    });
+
+    locator.invoke(() -> {
+      RegionMapping regionMapping = 
getRegionMappingFromClusterConfig(regionName, TEST_GROUP1);
+      assertValidMappingOnLocator(regionMapping, regionName, TEST_GROUP1, 
false, true);
+      regionMapping = getRegionMappingFromClusterConfig(regionName, 
TEST_GROUP2);
+      assertValidMappingOnLocator(regionMapping, regionName, TEST_GROUP2, 
false, true);
+    });
+  }
+
+  private static void assertValidMappingOnServer(RegionMapping mapping, String 
regionName,
+      boolean synchronous, boolean isParallel) {
+    assertValidMapping(mapping);
+    validateRegionAlteredOnServer(regionName, synchronous);
+    if (!synchronous) {
+      validateAsyncEventQueueCreatedOnServer(regionName, isParallel);
+    }
+  }
+
+  private static void assertValidMappingOnLocator(RegionMapping mapping, 
String regionName,
+      String groups,
+      boolean synchronous, boolean isParallel) {
+    assertValidMapping(mapping);
+    validateRegionAlteredInClusterConfig(regionName, groups, synchronous);
+    if (!synchronous) {
+      validateAsyncEventQueueCreatedInClusterConfig(regionName, groups, 
isParallel);
+    }
+  }
+
+  private static void assertValidMapping(RegionMapping mapping) {
+    assertThat(mapping.getDataSourceName()).isEqualTo("connection");
+    assertThat(mapping.getTableName()).isEqualTo("myTable");
+    assertThat(mapping.getPdxName()).isEqualTo(IdAndName.class.getName());
+    assertThat(mapping.getIds()).isEqualTo("myId");
+    assertThat(mapping.getCatalog()).isNull();
+    assertThat(mapping.getSchema()).isEqualTo("mySchema");
+    List<FieldMapping> fieldMappings = mapping.getFieldMappings();
+    assertThat(fieldMappings.size()).isEqualTo(2);
+    assertThat(fieldMappings.get(0)).isEqualTo(
+        new FieldMapping("myid", FieldType.STRING.name(), "MYID", 
JDBCType.VARCHAR.name(), false));
+    assertThat(fieldMappings.get(1)).isEqualTo(
+        new FieldMapping("name", FieldType.STRING.name(), "NAME", 
JDBCType.VARCHAR.name(), true));
+  }
+
+
+  private static void assertValidResourcePDXMapping(RegionMapping mapping, 
String tableName) {
+    assertThat(mapping.getDataSourceName()).isEqualTo("connection");
+    assertThat(mapping.getTableName()).isEqualTo(tableName);
+    
assertThat(mapping.getPdxName()).isEqualTo("org.apache.geode.internal.ResourcePDX");
+    assertThat(mapping.getIds()).isEqualTo("id");
+    assertThat(mapping.getCatalog()).isNull();
+    assertThat(mapping.getSchema()).isEqualTo("mySchema");
+    List<FieldMapping> fieldMappings = mapping.getFieldMappings();
+    assertThat(fieldMappings).hasSize(3);
+    assertThat(fieldMappings.get(0))
+        .isEqualTo(new FieldMapping("id", "STRING", "ID", "VARCHAR", false));
+    assertThat(fieldMappings.get(1))
+        .isEqualTo(new FieldMapping("name", "STRING", "NAME", "VARCHAR", 
true));
+    assertThat(fieldMappings.get(2))
+        .isEqualTo(new FieldMapping("age", "INT", "AGE", "INTEGER", true));
+  }
+
+  public static class IdAndName implements PdxSerializable {
+    private String id;
+    private String name;
+
+    public IdAndName() {
+      // nothing
+    }
+
+    IdAndName(String id, String name) {
+      this.id = id;
+      this.name = name;
+    }
+
+    String getId() {
+      return id;
+    }
+
+    String getName() {
+      return name;
+    }
+
+    @Override
+    public void toData(PdxWriter writer) {
+      writer.writeString("myid", this.id);
+      writer.writeString("name", this.name);
+    }
+
+    @Override
+    public void fromData(PdxReader reader) {
+      this.id = reader.readString("myid");
+      this.name = reader.readString("name");
+    }
+  }
+
+}
diff --git 
a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingFunction.java
 
b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingFunction.java
index 6536a4e..c1bf2a8 100644
--- 
a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingFunction.java
+++ 
b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingFunction.java
@@ -26,6 +26,8 @@ import 
org.apache.geode.connectors.jdbc.internal.JdbcConnectorService;
 import org.apache.geode.connectors.jdbc.internal.RegionMappingExistsException;
 import org.apache.geode.connectors.jdbc.internal.configuration.RegionMapping;
 import org.apache.geode.connectors.util.internal.MappingCommandUtils;
+import org.apache.geode.internal.cache.LocalRegion;
+import org.apache.geode.internal.cache.PartitionedRegion;
 import org.apache.geode.management.cli.CliFunction;
 import org.apache.geode.management.internal.cli.functions.CliFunctionResult;
 
@@ -75,9 +77,19 @@ public class CreateMappingFunction extends 
CliFunction<Object[]> {
    * and the given async-event-queue as one of its queues.
    */
   private void alterRegion(Region<?, ?> region, String queueName, boolean 
synchronous) {
-    region.getAttributesMutator().setCacheLoader(new JdbcLoader());
+    boolean isProxy = false;
+    if (region instanceof PartitionedRegion) {
+      isProxy = ((PartitionedRegion) region).getLocalMaxMemory() == 0;
+    } else {
+      isProxy = ((LocalRegion) region).isProxy();
+    }
+    if (!isProxy) {
+      region.getAttributesMutator().setCacheLoader(new JdbcLoader());
+    }
     if (synchronous) {
-      region.getAttributesMutator().setCacheWriter(new JdbcWriter());
+      if (!isProxy) {
+        region.getAttributesMutator().setCacheWriter(new JdbcWriter());
+      }
     } else {
       region.getAttributesMutator().addAsyncEventQueueId(queueName);
     }

Reply via email to