[
https://issues.apache.org/jira/browse/GRIFFIN-213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16690744#comment-16690744
]
ASF GitHub Bot commented on GRIFFIN-213:
----------------------------------------
Github user chemikadze commented on a diff in the pull request:
https://github.com/apache/incubator-griffin/pull/456#discussion_r234426015
--- Diff:
measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala
---
@@ -84,6 +87,26 @@ object DataConnectorFactory extends Loggable {
}
}
+ private def getCustomConnector(session: SparkSession,
+ context: StreamingContext,
+ param: DataConnectorParam,
+ storage: TimestampStorage,
+ maybeClient:
Option[StreamingCacheClient]): DataConnector = {
+ val className = param.getConfig("class").asInstanceOf[String]
+ val cls = Class.forName(className)
+ if (classOf[BatchDataConnector].isAssignableFrom(cls)) {
+ val ctx = BatchDataConnectorContext(session, param, storage)
+ val meth = cls.getDeclaredMethod("apply",
classOf[BatchDataConnectorContext])
+ meth.invoke(null, ctx).asInstanceOf[BatchDataConnector]
+ } else if (classOf[StreamingDataConnector].isAssignableFrom(cls)) {
+ val ctx = StreamingDataConnectorContext(session, context, param,
storage, maybeClient)
+ val meth = cls.getDeclaredMethod("apply",
classOf[StreamingDataConnectorContext])
+ meth.invoke(null, ctx).asInstanceOf[StreamingDataConnector]
+ } else {
+ throw new ClassCastException("")
--- End diff --
Oh, thanks for reminding! Planned to do that, but got distracted.
> Support pluggable datasource connectors
> ---------------------------------------
>
> Key: GRIFFIN-213
> URL: https://issues.apache.org/jira/browse/GRIFFIN-213
> Project: Griffin (Incubating)
> Issue Type: Improvement
> Reporter: Nikolay Sokolov
> Priority: Minor
>
> As of Griffin 0.3, code modification is required, in order to add new data
> connectors.
> Proposal is to add new data connector type, CUSTOM, that would allow to
> specify class name of data connector implementation to use. Additional jars
> with custom connector implementations would be provided in spark
> configuration template.
> Class name would be specified in "class" config of data connector. For
> example:
> {code:json}
> "connectors": [
> {
> "type": "CUSTOM",
> "config": {
> "class": "org.example.griffin.JDBCConnector"
> // extra connector-specific parameters
> }
> }
> ]
> {code}
> Proposed contract for implementations is based on current convention:
> - for batch
> ** class should be a subclass of BatchDataConnector
> ** if should have method with signature:
> {code:java}
> public static BatchDataConnector apply(ctx: BatchDataConnectorContext)
> {code}
> - for streaming
> ** class should be a subclass of StreamingDataConnector
> ** it should have method with signature:
> {code:java}
> public static StreamingDataConnector apply(ctx: StreamingDataConnectorContext)
> {code}
> Signatures of context objects:
> {code:scala}
> case class BatchDataConnectorContext(@transient sparkSession: SparkSession,
> dcParam: DataConnectorParam,
> timestampStorage: TimestampStorage)
> case class StreamingDataConnectorContext(@transient sparkSession:
> SparkSession,
> @transient ssc: StreamingContext,
> dcParam: DataConnectorParam,
> timestampStorage: TimestampStorage,
> streamingCacheClientOpt:
> Option[StreamingCacheClient])
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)