Custom Inputrowparser Example:
>i created jar under
>/resources/META-INF/services/io.druid.initialization.DruidModule
>uploaded jar under druid/extensions/druid-reporting-transformer
>added custom extension to druid
>conf-quickstart/druid/_common/common.runtime.properties
>druid.extensions.loadList=["druid-reporting-transformer"]
This is my customInputrowparser:
public class ExampleInputRowParser implements ByteBufferInputRowParser {
public static final String TYPE_NAME = "exampleParser";
private final ParseSpec parseSpec;
private StringInputRowParser stringParser;
@JsonCreator
public ExampleInputRowParser(@JsonProperty("parseSpec") ParseSpec
parseSpec)
{
this.parseSpec = parseSpec;
}
@Override
public ParseSpec getParseSpec() {
return parseSpec;
}
@Override
public List<InputRow> parseBatch(ByteBuffer input)
{
if (stringParser == null) {
stringParser = new StringInputRowParser(parseSpec);
}
String objstring = StringUtils.fromUtf8(input);
try {
Map requestMap = (Map) JsonUtil.stringToObj(objstring,
Map.class);
populateTeam(requestMap);
String stringInput=JsonUtil.deepString(requestMap);
return ImmutableList.of(stringParser.parse(stringInput));
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
@Override
public ByteBufferInputRowParser withParseSpec(ParseSpec parseSpec) {
return new ExampleInputRowParser(parseSpec);
}
public void populateTeam(Map requestMap) {
Map<String, Object> payload = (Map<String, Object>)
requestMap.get("payload");
Map<String, Object> dataMap = (Map<String, Object>)
payload.get("outboundMsg");
//String tenant = (String) payload.get("tenant");
Object objValue = dataMap.get("taskEntities");
if (objValue != null && objValue instanceof List) {
List<Map> innerList = (List<Map>) objValue;
if (innerList.size() > 0) {
objValue =
innerList.get(0).get("taskInstanceEntities");
if (objValue != null && objValue
instanceof List) {
Map<String, Object> taskMap =
(Map) ((List) objValue).get(0);
//objValue =
taskMap.get("assignedToUserId");
//String userId = null;
//if (objValue != null) {
taskMap.put("teamId", "userteamid");
taskMap.put("teamName", "userteamName");
taskMap.put("parent", "userteamtead");
//}
}
}
}
}
}
And restarted all druid nodes:
[historical,broker,overload,coordinator,middlemanager]
create topic in kafka:
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor
1 --partitions 1 --topic Reporting_Transformer
Create InputSpec in druid under quickstart
reporting-transformerspec.json:
{
"dataSchema": {
"dataSource": "Reporting_Transformer",
"parser": {
"type": "reportingParser",
"parseSpec": {
"format": "json",
"flattenSpec": {
"useFieldDiscovery": true,
"fields": [
{
"type": "path",
"name": "task_name",
"expr":
"$.payload.outboundMsg.taskEntities[0].taskInstanceEntities[0].taskName"
},
{
"type": "path",
"name": "team_name",
"expr":
"$.payload.outboundMsg.taskEntities[0].taskInstanceEntities[0].teamName"
},
{
"type": "path",
"name": "team_id",
"expr":
"$.payload.outboundMsg.taskEntities[0].taskInstanceEntities[0].teamId"
},
{
"type": "path",
"name": "parent",
"expr":
"$.payload.outboundMsg.taskEntities[0].taskInstanceEntities[0].parent"
},
{
"type": "path",
"name": "created_date",
"expr":
"$.payload.outboundMsg.taskEntities[0].taskInstanceEntities[0].createdDate"
}
]
},
"timestampSpec": {
"column": "created_date",
"format": "auto"
},
"dimensionsSpec": {
"dimensions": ["task_name", "team_id", "team_name", "parent",{"name":
"created_date", "type":"long"}]
}
}
},
"metricsSpec" : [],
"transformSpec": {
"transforms": []
},
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "HOUR",
"queryGranularity": "NONE",
"rollup": true
}
},
"ioConfig": {
"topic": "Reporting_Transformer",
"consumerProperties": {
"bootstrap.servers": "localhost:9092"
}
> submit spec to supervisor:
> curl -XPOST -H'Content-Type: application/json' -d
> @quickstart/reporting-transformerspec.json
> http://localhost:8090/druid/indexer/v1/supervisor
> send jsondata from kafka to druid:
> ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic
> Reporting_Transformer
{"payload":{"outboundMsg":{"taskEntities": [
{"taskInstanceEntities":[
{"taskName":"process123","created_date":1566222246780}] }
]}
}}
when i submitted the query spec it is showing empty.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]