import org.apache.geode.cache.Operation;
import org.apache.geode.cache.client.ClientCache;
import org.apache.geode.cache.client.ClientCacheFactory;
import org.apache.geode.cache.query.CqAttributes;
import org.apache.geode.cache.query.CqAttributesFactory;
import org.apache.geode.cache.query.CqQuery;
import org.apache.geode.cache.query.CqResults;
import org.apache.geode.cache.query.QueryService;
import org.apache.geode.cache.query.Struct;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.pdx.internal.PdxType;
import org.apache.geode.pdx.internal.TypeRegistry;

public class MigrationClient {

  private final ClientCache cache;

  private final MigrationListener migrationListener;

  public static void main(String[] args) throws Exception {
    MigrationClient client = new MigrationClient("data");
    client.runMigration("ln-pool", "data");
  }

  public MigrationClient(String regionName) {
    this.cache = new ClientCacheFactory().create();
    this.migrationListener = new MigrationListener(this.cache, regionName);
  }

  private void runMigration(String poolName, String regionName) throws Exception {
    // Create CQ
    CqQuery cq = createCQ(poolName, regionName);

    // Execute CQ
    CqResults<Struct> results = cq.executeWithInitialResults();

    // Register PdxTypes on all pools
    // Note: This is only necessary if PDX serialization is used
    // See https://issues.apache.org/jira/browse/GEODE-6271
    registerPdxTypesOnAllPools();

    // Process initial results
    processInitialResults(results);

    // Send readyForEvents to start receiving queued events
    this.cache.readyForEvents();

    // Wait forever
    waitForever();
  }

  private CqQuery createCQ(String poolName, String regionName) throws Exception {
    // Get QueryService on the pool
    QueryService queryService = this.cache.getQueryService(poolName);
    
    // Create CQ Attributes
    CqAttributesFactory cqAf = new CqAttributesFactory();
    
    // Add CqListener
    cqAf.addCqListener(this.migrationListener);
    CqAttributes cqa = cqAf.create();
    
    // Create CQ
    String cqName = poolName + "_cq";
    String cqQuery = "SELECT * FROM /" + regionName;
    CqQuery cq = queryService.newCq(cqName, cqQuery, cqa);
    System.out.println("Created cqName=" + cqName + "; cq=" + cq);
    return cq;
  }

  private void registerPdxTypesOnAllPools() {
    TypeRegistry tr = ((InternalCache) this.cache).getPdxRegistry();
    for (PdxType pdxType : tr.typeMap().values()) {
      tr.addImportedType(pdxType.getTypeId(), pdxType);
    }
  }

  private void processInitialResults(CqResults<Struct> results) {
    for (Struct result : results) {
      this.migrationListener.processEvent(Operation.CREATE, result.get("key"), result.get("value"));
    }
  }

  protected void waitForever() throws InterruptedException {
    Object obj = new Object();
    synchronized (obj) {
      obj.wait();
    }
  }
}