Github user ajantha-bhat commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/3019#discussion_r244153304
  
    --- Diff: 
integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java
 ---
    @@ -17,69 +17,177 @@
     
     package org.apache.carbondata.presto;
     
    +import java.lang.management.ManagementFactory;
    +import java.lang.reflect.*;
     import java.util.Map;
    +import java.util.Optional;
    +import java.util.Set;
     
     import static java.util.Objects.requireNonNull;
     
    -import com.facebook.presto.spi.ConnectorHandleResolver;
    +import org.apache.carbondata.presto.impl.CarbonTableConfig;
    +
    +import com.facebook.presto.hive.HiveConnector;
    +import com.facebook.presto.hive.HiveConnectorFactory;
    +import com.facebook.presto.hive.HiveMetadataFactory;
    +import com.facebook.presto.hive.HiveProcedureModule;
    +import com.facebook.presto.hive.HiveSchemaProperties;
    +import com.facebook.presto.hive.HiveSessionProperties;
    +import com.facebook.presto.hive.HiveStorageFormat;
    +import com.facebook.presto.hive.HiveTableProperties;
    +import com.facebook.presto.hive.HiveTransactionManager;
    +import com.facebook.presto.hive.NodeVersion;
    +import com.facebook.presto.hive.RebindSafeMBeanServer;
    +import com.facebook.presto.hive.authentication.HiveAuthenticationModule;
    +import com.facebook.presto.hive.metastore.HiveMetastoreModule;
    +import com.facebook.presto.hive.s3.HiveS3Module;
    +import com.facebook.presto.hive.security.HiveSecurityModule;
    +import com.facebook.presto.hive.security.PartitionsAwareAccessControl;
    +import com.facebook.presto.spi.NodeManager;
    +import com.facebook.presto.spi.PageIndexerFactory;
    +import com.facebook.presto.spi.PageSorter;
     import com.facebook.presto.spi.classloader.ThreadContextClassLoader;
    -import com.facebook.presto.spi.connector.*;
    -import 
com.facebook.presto.spi.connector.classloader.ClassLoaderSafeConnectorMetadata;
    +import com.facebook.presto.spi.connector.Connector;
    +import com.facebook.presto.spi.connector.ConnectorAccessControl;
    +import com.facebook.presto.spi.connector.ConnectorContext;
    +import com.facebook.presto.spi.connector.ConnectorNodePartitioningProvider;
    +import com.facebook.presto.spi.connector.ConnectorPageSinkProvider;
    +import com.facebook.presto.spi.connector.ConnectorPageSourceProvider;
    +import com.facebook.presto.spi.connector.ConnectorSplitManager;
    +import 
com.facebook.presto.spi.connector.classloader.ClassLoaderSafeConnectorPageSinkProvider;
     import 
com.facebook.presto.spi.connector.classloader.ClassLoaderSafeConnectorPageSourceProvider;
     import 
com.facebook.presto.spi.connector.classloader.ClassLoaderSafeConnectorSplitManager;
    -import com.google.common.base.Throwables;
    +import 
com.facebook.presto.spi.connector.classloader.ClassLoaderSafeNodePartitioningProvider;
    +import com.facebook.presto.spi.procedure.Procedure;
    +import com.facebook.presto.spi.type.TypeManager;
    +import com.google.common.collect.ImmutableSet;
     import com.google.inject.Injector;
    +import com.google.inject.Key;
    +import com.google.inject.TypeLiteral;
     import io.airlift.bootstrap.Bootstrap;
     import io.airlift.bootstrap.LifeCycleManager;
    +import io.airlift.event.client.EventModule;
     import io.airlift.json.JsonModule;
    +import io.airlift.units.DataSize;
    +import org.weakref.jmx.guice.MBeanModule;
    +import sun.reflect.ConstructorAccessor;
     
    +import static com.google.common.base.Throwables.throwIfUnchecked;
    +import static io.airlift.configuration.ConfigBinder.configBinder;
     
     /**
      * Build Carbondata Connector
      * It will be called by CarbondataPlugin
      */
    -public class CarbondataConnectorFactory implements ConnectorFactory {
    +public class CarbondataConnectorFactory extends HiveConnectorFactory {
     
    -  private final String name;
       private final ClassLoader classLoader;
     
       public CarbondataConnectorFactory(String connectorName, ClassLoader 
classLoader) {
    -    this.name = connectorName;
    +    super(connectorName, classLoader, null);
         this.classLoader = requireNonNull(classLoader, "classLoader is null");
       }
     
    -  @Override public String getName() {
    -    return name;
    -  }
    -
    -  @Override public ConnectorHandleResolver getHandleResolver() {
    -    return new CarbondataHandleResolver();
    -  }
    -
    -  @Override public Connector create(String connectorId, Map<String, 
String> config,
    +  @Override public Connector create(String catalogName, Map<String, 
String> config,
           ConnectorContext context) {
         requireNonNull(config, "config is null");
     
         try (ThreadContextClassLoader ignored = new 
ThreadContextClassLoader(classLoader)) {
    -      Bootstrap app = new Bootstrap(new JsonModule(),
    -          new CarbondataModule(connectorId, context.getTypeManager()));
    +      Bootstrap app = new Bootstrap(new EventModule(), new MBeanModule(), 
new JsonModule(),
    +          new CarbondataModule(catalogName), new HiveS3Module(catalogName),
    +          new HiveMetastoreModule(catalogName, Optional.ofNullable(null)), 
new HiveSecurityModule(),
    +          new HiveAuthenticationModule(), new HiveProcedureModule(), 
binder -> {
    +        javax.management.MBeanServer platformMBeanServer =
    +            ManagementFactory.getPlatformMBeanServer();
    +        binder.bind(javax.management.MBeanServer.class)
    +            .toInstance(new RebindSafeMBeanServer(platformMBeanServer));
    +        binder.bind(NodeVersion.class)
    +            .toInstance(new 
NodeVersion(context.getNodeManager().getCurrentNode().getVersion()));
    +        
binder.bind(NodeManager.class).toInstance(context.getNodeManager());
    +        
binder.bind(TypeManager.class).toInstance(context.getTypeManager());
    +        
binder.bind(PageIndexerFactory.class).toInstance(context.getPageIndexerFactory());
    +        binder.bind(PageSorter.class).toInstance(context.getPageSorter());
    +        configBinder(binder).bindConfig(CarbonTableConfig.class);
    +      });
     
           Injector injector =
               
app.strictConfig().doNotInitializeLogging().setRequiredConfigurationProperties(config)
                   .initialize();
     
    +      setCarbonEnum();
    +
           LifeCycleManager lifeCycleManager = 
injector.getInstance(LifeCycleManager.class);
    -      ConnectorMetadata metadata = 
injector.getInstance(CarbondataMetadata.class);
    +      HiveMetadataFactory metadataFactory = 
injector.getInstance(HiveMetadataFactory.class);
    +      HiveTransactionManager transactionManager =
    +          injector.getInstance(HiveTransactionManager.class);
           ConnectorSplitManager splitManager = 
injector.getInstance(ConnectorSplitManager.class);
           ConnectorPageSourceProvider connectorPageSource =
               injector.getInstance(ConnectorPageSourceProvider.class);
    +      ConnectorPageSinkProvider pageSinkProvider =
    +          injector.getInstance(ConnectorPageSinkProvider.class);
    +      ConnectorNodePartitioningProvider connectorDistributionProvider =
    +          injector.getInstance(ConnectorNodePartitioningProvider.class);
    +      HiveSessionProperties hiveSessionProperties =
    +          injector.getInstance(HiveSessionProperties.class);
    +      HiveTableProperties hiveTableProperties = 
injector.getInstance(HiveTableProperties.class);
    +      ConnectorAccessControl accessControl =
    +          new 
PartitionsAwareAccessControl(injector.getInstance(ConnectorAccessControl.class));
    +      Set<Procedure> procedures = injector.getInstance(Key.get(new 
TypeLiteral<Set<Procedure>>() {
    +      }));
     
    -      return new CarbondataConnector(lifeCycleManager,
    -          new ClassLoaderSafeConnectorMetadata(metadata, classLoader),
    -          new ClassLoaderSafeConnectorSplitManager(splitManager, 
classLoader), classLoader,
    -          new 
ClassLoaderSafeConnectorPageSourceProvider(connectorPageSource, classLoader));
    +      return new HiveConnector(lifeCycleManager, metadataFactory, 
transactionManager,
    +          new ClassLoaderSafeConnectorSplitManager(splitManager, 
classLoader),
    +          new 
ClassLoaderSafeConnectorPageSourceProvider(connectorPageSource, classLoader),
    +          new ClassLoaderSafeConnectorPageSinkProvider(pageSinkProvider, 
classLoader),
    +          new 
ClassLoaderSafeNodePartitioningProvider(connectorDistributionProvider, 
classLoader),
    +          ImmutableSet.of(), procedures, 
hiveSessionProperties.getSessionProperties(),
    +          HiveSchemaProperties.SCHEMA_PROPERTIES, 
hiveTableProperties.getTableProperties(),
    +          accessControl, classLoader);
         } catch (Exception e) {
    -      throw Throwables.propagate(e);
    +      throwIfUnchecked(e);
    +      throw new RuntimeException(e);
         }
       }
    +
    +  /**
    +   * Set the Carbon format enum to HiveStorageFormat, its a hack but for 
time being it is best
    +   * choice to avoid lot of code change.
    +   *
    +   * @throws Exception
    +   */
    +  private void setCarbonEnum() throws Exception {
    +    for (HiveStorageFormat format : HiveStorageFormat.values()) {
    +      if (format.name().equals("CARBON")) {
    +        return;
    +      }
    +    }
    +    Constructor<?>[] declaredConstructors = 
HiveStorageFormat.class.getDeclaredConstructors();
    +    declaredConstructors[0].setAccessible(true);
    +    Field constructorAccessorField = 
Constructor.class.getDeclaredField("constructorAccessor");
    +    constructorAccessorField.setAccessible(true);
    +    ConstructorAccessor ca =
    +        (ConstructorAccessor) 
constructorAccessorField.get(declaredConstructors[0]);
    +    if (ca == null) {
    +      Method acquireConstructorAccessorMethod =
    +          
Constructor.class.getDeclaredMethod("acquireConstructorAccessor");
    +      acquireConstructorAccessorMethod.setAccessible(true);
    +      ca = (ConstructorAccessor) 
acquireConstructorAccessorMethod.invoke(declaredConstructors[0]);
    +    }
    +    Object instance = ca.newInstance(new Object[] { "CARBON", 
HiveStorageFormat.values().length, "",
    +        "org.apache.carbondata.hadoop.api.CarbonFileInputFormat",
    --- End diff --
    
    should this be TableInputFormat ?
    
    because in CarbondataPageSourceProvider.java we have used TableInputFormat


---

Reply via email to