This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new e6c2f5bd3b Refactor upsert quickstarts (#9246)
e6c2f5bd3b is described below
commit e6c2f5bd3ba02d87a329fde99882a6c01d69894b
Author: Xiang Fu <[email protected]>
AuthorDate: Fri Aug 19 14:42:06 2022 -0700
Refactor upsert quickstarts (#9246)
---
.../pinot/tools/PartialUpsertQuickStart.java | 75 +++----------------
.../org/apache/pinot/tools/QuickStartBase.java | 83 +++++++++++++++++++---
.../apache/pinot/tools/UpsertJsonQuickStart.java | 63 ++--------------
.../org/apache/pinot/tools/UpsertQuickStart.java | 59 ++-------------
.../pinot/tools/streams/RsvpSourceGenerator.java | 6 +-
...eHandling_meetupRsvp_realtime_table_config.json | 42 -----------
.../complexTypeHandling_meetupRsvp_schema.json | 63 ----------------
...psertJsonMeetupRsvp_realtime_table_config.json} | 4 +-
.../upsertJsonMeetupRsvp_schema.json} | 2 +-
.../upsertMeetupRsvp_realtime_table_config.json} | 4 +-
.../upsertMeetupRsvp_schema.json} | 2 +-
...rtPartialMeetupRsvp_realtime_table_config.json} | 4 +-
.../upsertPartialMeetupRsvp_schema.json} | 2 +-
13 files changed, 108 insertions(+), 301 deletions(-)
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/PartialUpsertQuickStart.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/PartialUpsertQuickStart.java
index a581d5c89a..2a35478ad5 100644
---
a/pinot-tools/src/main/java/org/apache/pinot/tools/PartialUpsertQuickStart.java
+++
b/pinot-tools/src/main/java/org/apache/pinot/tools/PartialUpsertQuickStart.java
@@ -18,21 +18,14 @@
*/
package org.apache.pinot.tools;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import java.io.File;
-import java.net.URL;
import java.util.Arrays;
import java.util.List;
-import org.apache.commons.io.FileUtils;
import org.apache.pinot.spi.plugin.PluginManager;
import org.apache.pinot.tools.Quickstart.Color;
import org.apache.pinot.tools.admin.command.QuickstartRunner;
-import org.apache.pinot.tools.streams.MeetupRsvpStream;
-import org.apache.pinot.tools.utils.KafkaStarterUtils;
-public class PartialUpsertQuickStart extends QuickStartBase {
+public class PartialUpsertQuickStart extends RealtimeQuickStart {
public static void main(String[] args)
throws Exception {
@@ -41,64 +34,15 @@ public class PartialUpsertQuickStart extends QuickStartBase
{
}
@Override
- public List<String> types() {
- return Arrays.asList("PARTIAL-UPSERT", "PARTIAL_UPSERT");
- }
-
- // Todo: add a quick start demo
- public void execute()
+ public void runSampleQueries(QuickstartRunner runner)
throws Exception {
- File quickstartTmpDir = new File(FileUtils.getTempDirectory(),
String.valueOf(System.currentTimeMillis()));
- File bootstrapTableDir = new File(quickstartTmpDir, "meetupRsvp");
- File dataDir = new File(bootstrapTableDir, "data");
- Preconditions.checkState(dataDir.mkdirs());
-
- File schemaFile = new File(bootstrapTableDir, "meetupRsvp_schema.json");
- File tableConfigFile = new File(bootstrapTableDir,
"meetupRsvp_realtime_table_config.json");
-
- ClassLoader classLoader = Quickstart.class.getClassLoader();
- URL resource =
classLoader.getResource("examples/stream/meetupRsvp/upsert_partial_meetupRsvp_schema.json");
- Preconditions.checkNotNull(resource);
- FileUtils.copyURLToFile(resource, schemaFile);
- resource =
-
classLoader.getResource("examples/stream/meetupRsvp/upsert_partial_meetupRsvp_realtime_table_config.json");
- Preconditions.checkNotNull(resource);
- FileUtils.copyURLToFile(resource, tableConfigFile);
-
- QuickstartTableRequest request = new
QuickstartTableRequest(bootstrapTableDir.getAbsolutePath());
- final QuickstartRunner runner =
- new QuickstartRunner(Lists.newArrayList(request), 1, 1, 1, 1, dataDir,
getConfigOverrides());
-
- startKafka();
- _kafkaStarter.createTopic("meetupRSVPEvents",
KafkaStarterUtils.getTopicCreationProps(2));
- printStatus(Color.CYAN, "***** Starting meetup data stream and publishing
to Kafka *****");
- MeetupRsvpStream meetupRSVPProvider = new MeetupRsvpStream(true);
- meetupRSVPProvider.run();
- printStatus(Color.CYAN, "***** Starting Zookeeper, controller, server and
broker *****");
- runner.startAll();
- Runtime.getRuntime().addShutdownHook(new Thread(() -> {
- try {
- printStatus(Color.GREEN, "***** Shutting down realtime quick start
*****");
- runner.stop();
- meetupRSVPProvider.stopPublishing();
- FileUtils.deleteDirectory(quickstartTmpDir);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }));
- printStatus(Color.CYAN, "***** Bootstrap meetupRSVP(upsert) table *****");
- runner.bootstrapTable();
- printStatus(Color.CYAN, "***** Waiting for 15 seconds for a few events to
get populated *****");
- Thread.sleep(15000);
-
- printStatus(Color.YELLOW, "***** Upsert quickstart setup complete *****");
-
// The expected behavior for total number of documents per PK should be 1.
// The expected behavior for total number of rsvp_counts per PK should >=1
since it's incremented and updated.
+
// The expected behavior for nums of values in group_name fields should
equals to rsvp_counts.
String q1 =
- "select event_id, count(*), sum(rsvp_count) from meetupRsvp group by
event_id order by sum(rsvp_count) desc "
- + "limit 10";
+ "select event_id, count(*), sum(rsvp_count) from
upsertPartialMeetupRsvp group by event_id order by sum"
+ + "(rsvp_count) desc limit 10";
printStatus(Color.YELLOW, "Total number of documents, total number of
rsvp_counts per event_id in the table");
printStatus(Color.YELLOW, "***** The expected behavior for total number of
documents per PK should be 1 *****");
printStatus(Color.YELLOW,
@@ -112,8 +56,8 @@ public class PartialUpsertQuickStart extends QuickStartBase {
// The expected behavior for nums of values in group_name fields should
equals to rsvp_counts.
String q2 =
- "select event_id, group_name, venue_name, rsvp_count from meetupRsvp
where rsvp_count > 1 order by rsvp_count"
- + " desc limit 10";
+ "select event_id, group_name, venue_name, rsvp_count from
upsertPartialMeetupRsvp where rsvp_count > 1 order "
+ + "by rsvp_count desc limit 10";
printStatus(Color.YELLOW, "Event_id, group_name, venue_name, rsvp_count
per per event_id in the table");
printStatus(Color.YELLOW,
"***** Nums of values in group_name fields should less than or equals
to rsvp_count. Duplicate records are "
@@ -123,7 +67,10 @@ public class PartialUpsertQuickStart extends QuickStartBase
{
printStatus(Color.CYAN, "Query : " + q2);
printStatus(Color.YELLOW, prettyPrintResponse(runner.runQuery(q2)));
printStatus(Color.GREEN,
"***************************************************");
+ }
- printStatus(Color.GREEN, "You can always go to http://localhost:9000 to
play around in the query console");
+ @Override
+ public List<String> types() {
+ return Arrays.asList("PARTIAL-UPSERT", "PARTIAL_UPSERT");
}
}
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/QuickStartBase.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/QuickStartBase.java
index bab8c768cb..efd2510c21 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/QuickStartBase.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/QuickStartBase.java
@@ -19,6 +19,7 @@
package org.apache.pinot.tools;
import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import java.io.File;
@@ -41,6 +42,7 @@ import org.apache.pinot.spi.stream.StreamDataServerStartable;
import org.apache.pinot.tools.admin.command.QuickstartRunner;
import org.apache.pinot.tools.streams.AirlineDataStream;
import org.apache.pinot.tools.streams.MeetupRsvpStream;
+import org.apache.pinot.tools.streams.RsvpSourceGenerator;
import org.apache.pinot.tools.utils.JarUtils;
import org.apache.pinot.tools.utils.KafkaStarterUtils;
import org.apache.pinot.tools.utils.PinotConfigUtils;
@@ -73,12 +75,16 @@ public abstract class QuickStartBase {
"examples/batch/githubComplexTypeEvents"
};
- protected static final Map<String, String> DEFAULT_STREAM_TABLE_DIRECTORIES
= ImmutableMap.of(
- "airlineStats", "examples/stream/airlineStats",
- "githubEvents", "examples/minions/stream/githubEvents",
- "meetupRsvp", "examples/stream/meetupRsvp",
- "meetupRsvpJson", "examples/stream/meetupRsvpJson",
- "meetupRsvpComplexType", "examples/stream/meetupRsvpComplexType");
+ protected static final Map<String, String> DEFAULT_STREAM_TABLE_DIRECTORIES
= ImmutableMap.<String, String>builder()
+ .put("airlineStats", "examples/stream/airlineStats")
+ .put("githubEvents", "examples/minions/stream/githubEvents")
+ .put("meetupRsvp", "examples/stream/meetupRsvp")
+ .put("meetupRsvpJson", "examples/stream/meetupRsvpJson")
+ .put("meetupRsvpComplexType", "examples/stream/meetupRsvpComplexType")
+ .put("upsertMeetupRsvp", "examples/stream/upsertMeetupRsvp")
+ .put("upsertJsonMeetupRsvp", "examples/stream/upsertJsonMeetupRsvp")
+ .put("upsertPartialMeetupRsvp",
"examples/stream/upsertPartialMeetupRsvp")
+ .build();
protected File _dataDir = FileUtils.getTempDirectory();
protected String[] _bootstrapDataDirs;
@@ -270,14 +276,14 @@ public abstract class QuickStartBase {
JsonNode columns =
response.get("resultTable").get("dataSchema").get("columnNames");
int numColumns = columns.size();
for (int i = 0; i < numColumns; i++) {
- responseBuilder.append(columns.get(i).asText()).append(TAB);
+ responseBuilder.append(jsonNode2String(columns.get(i))).append(TAB);
}
responseBuilder.append(NEW_LINE);
JsonNode rows = response.get("resultTable").get("rows");
for (int i = 0; i < rows.size(); i++) {
JsonNode row = rows.get(i);
for (int j = 0; j < numColumns; j++) {
- responseBuilder.append(row.get(j).asText()).append(TAB);
+ responseBuilder.append(jsonNode2String(row.get(j))).append(TAB);
}
responseBuilder.append(NEW_LINE);
}
@@ -285,6 +291,22 @@ public abstract class QuickStartBase {
return responseBuilder.toString();
}
+ private static String jsonNode2String(JsonNode jsonNode) {
+ if (jsonNode instanceof ArrayNode) {
+ ArrayNode arrayNode = (ArrayNode) jsonNode;
+ String result = "[";
+ for (int i = 0; i < arrayNode.size() - 1; i++) {
+ result += jsonNode2String(arrayNode.get(i)) + ", ";
+ }
+ if (arrayNode.size() > 0) {
+ result += jsonNode2String(arrayNode.get(arrayNode.size() - 1));
+ }
+ result += "]";
+ return result;
+ }
+ return jsonNode.asText();
+ }
+
protected Map<String, String> getDefaultStreamTableDirectories() {
return DEFAULT_STREAM_TABLE_DIRECTORIES;
}
@@ -400,6 +422,51 @@ public abstract class QuickStartBase {
}
}));
break;
+ case "upsertMeetupRsvp":
+ kafkaStarter.createTopic("upsertMeetupRSVPEvents",
KafkaStarterUtils.getTopicCreationProps(2));
+ printStatus(Quickstart.Color.CYAN,
+ "***** Starting upsertMeetupRSVPEvents data stream and
publishing to Kafka *****");
+ MeetupRsvpStream upsertMeetupRsvpProvider =
+ new MeetupRsvpStream("upsertMeetupRSVPEvents",
RsvpSourceGenerator.KeyColumn.EVENT_ID);
+ upsertMeetupRsvpProvider.run();
+ Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+ try {
+ upsertMeetupRsvpProvider.stopPublishing();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }));
+ break;
+ case "upsertJsonMeetupRsvp":
+ kafkaStarter.createTopic("upsertJsonMeetupRSVPEvents",
KafkaStarterUtils.getTopicCreationProps(2));
+ printStatus(Quickstart.Color.CYAN,
+ "***** Starting upsertJsonMeetupRSVPEvents data stream and
publishing to Kafka *****");
+ MeetupRsvpStream upsertJsonMeetupRsvpProvider =
+ new MeetupRsvpStream("upsertJsonMeetupRSVPEvents",
RsvpSourceGenerator.KeyColumn.RSVP_ID);
+ upsertJsonMeetupRsvpProvider.run();
+ Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+ try {
+ upsertJsonMeetupRsvpProvider.stopPublishing();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }));
+ break;
+ case "upsertPartialMeetupRsvp":
+ kafkaStarter.createTopic("upsertPartialMeetupRSVPEvents",
KafkaStarterUtils.getTopicCreationProps(2));
+ printStatus(Quickstart.Color.CYAN,
+ "***** Starting upsertPartialMeetupRSVPEvents data stream and
publishing to Kafka *****");
+ MeetupRsvpStream upsertPartialMeetupRsvpProvider =
+ new MeetupRsvpStream("upsertPartialMeetupRSVPEvents",
RsvpSourceGenerator.KeyColumn.EVENT_ID);
+ upsertPartialMeetupRsvpProvider.run();
+ Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+ try {
+ upsertPartialMeetupRsvpProvider.stopPublishing();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }));
+ break;
case "githubEvents":
kafkaStarter.createTopic("githubEvents",
KafkaStarterUtils.getTopicCreationProps(2));
printStatus(Quickstart.Color.CYAN, "***** Starting githubEvents data
stream and publishing to Kafka *****");
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/UpsertJsonQuickStart.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/UpsertJsonQuickStart.java
index 22af89ff27..27d4285c76 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/UpsertJsonQuickStart.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/UpsertJsonQuickStart.java
@@ -18,23 +18,15 @@
*/
package org.apache.pinot.tools;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import java.io.File;
-import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
-import org.apache.commons.io.FileUtils;
import org.apache.pinot.tools.Quickstart.Color;
import org.apache.pinot.tools.admin.PinotAdministrator;
import org.apache.pinot.tools.admin.command.QuickstartRunner;
-import org.apache.pinot.tools.streams.MeetupRsvpStream;
-import org.apache.pinot.tools.streams.RsvpSourceGenerator;
-import org.apache.pinot.tools.utils.KafkaStarterUtils;
-public class UpsertJsonQuickStart extends QuickStartBase {
+public class UpsertJsonQuickStart extends RealtimeQuickStart {
@Override
public List<String> types() {
return Arrays.asList("UPSERT_JSON_INDEX", "UPSERT-JSON-INDEX");
@@ -48,59 +40,16 @@ public class UpsertJsonQuickStart extends QuickStartBase {
PinotAdministrator.main(arguments.toArray(new String[0]));
}
- public void execute()
+ @Override
+ public void runSampleQueries(QuickstartRunner runner)
throws Exception {
- File quickstartTmpDir = new File(_dataDir,
String.valueOf(System.currentTimeMillis()));
- File baseDir = new File(quickstartTmpDir, "meetupRsvp");
- File dataDir = new File(baseDir, "data");
- Preconditions.checkState(dataDir.mkdirs());
-
- File schemaFile = new File(baseDir, "meetupRsvp_schema.json");
- File tableConfigFile = new File(baseDir,
"meetupRsvp_realtime_table_config.json");
-
- ClassLoader classLoader = Quickstart.class.getClassLoader();
- URL resource =
classLoader.getResource("examples/stream/meetupRsvp/upsert_json_meetupRsvp_schema.json");
- Preconditions.checkNotNull(resource);
- FileUtils.copyURLToFile(resource, schemaFile);
- resource =
classLoader.getResource("examples/stream/meetupRsvp/upsert_json_meetupRsvp_realtime_table_config.json");
- Preconditions.checkNotNull(resource);
- FileUtils.copyURLToFile(resource, tableConfigFile);
-
- QuickstartTableRequest request = new
QuickstartTableRequest(baseDir.getAbsolutePath());
- QuickstartRunner runner =
- new QuickstartRunner(Lists.newArrayList(request), 1, 1, 1, 1, dataDir,
getConfigOverrides());
-
- startKafka();
- _kafkaStarter.createTopic("meetupRSVPEvents",
KafkaStarterUtils.getTopicCreationProps(2));
- printStatus(Color.CYAN, "***** Starting meetup data stream and publishing
to Kafka *****");
- MeetupRsvpStream meetupRSVPProvider = new
MeetupRsvpStream(RsvpSourceGenerator.KeyColumn.RSVP_ID);
- meetupRSVPProvider.run();
- printStatus(Color.CYAN, "***** Starting Zookeeper, controller, server and
broker *****");
- runner.startAll();
- Runtime.getRuntime().addShutdownHook(new Thread(() -> {
- try {
- printStatus(Color.GREEN, "***** Shutting down realtime quick start
*****");
- runner.stop();
- meetupRSVPProvider.stopPublishing();
- FileUtils.deleteDirectory(quickstartTmpDir);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }));
- printStatus(Color.CYAN, "***** Bootstrap meetupRSVP table *****");
- runner.bootstrapTable();
- printStatus(Color.CYAN, "***** Waiting for 20 seconds for a few events to
get populated *****");
- Thread.sleep(20000);
-
printStatus(Color.YELLOW, "***** Upsert json-index quickstart setup
complete *****");
-
- String q1 = "select json_extract_scalar(event_json, '$.event_name',
'STRING') from meetupRsvp where json_match"
- + "(group_json, '\"$.group_topics[*].topic_name\"=''Fitness''') limit
10";
+ String q1 =
+ "select json_extract_scalar(event_json, '$.event_name', 'STRING') from
upsertJsonMeetupRsvp where json_match"
+ + "(group_json,
'\"$.group_topics[*].topic_name\"=''topic_name0''') limit 10";
printStatus(Color.YELLOW, "Events related to fitness");
printStatus(Color.CYAN, "Query : " + q1);
printStatus(Color.YELLOW, prettyPrintResponse(runner.runQuery(q1)));
-
printStatus(Color.GREEN,
"***************************************************");
- printStatus(Color.GREEN, "You can always go to http://localhost:9000 to
play around in the query console");
}
}
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/UpsertQuickStart.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/UpsertQuickStart.java
index d67bbcdaf6..f8dfb3953b 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/UpsertQuickStart.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/UpsertQuickStart.java
@@ -18,23 +18,16 @@
*/
package org.apache.pinot.tools;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import java.io.File;
-import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
-import org.apache.commons.io.FileUtils;
import org.apache.pinot.tools.Quickstart.Color;
import org.apache.pinot.tools.admin.PinotAdministrator;
import org.apache.pinot.tools.admin.command.QuickstartRunner;
-import org.apache.pinot.tools.streams.MeetupRsvpStream;
-import org.apache.pinot.tools.utils.KafkaStarterUtils;
-public class UpsertQuickStart extends QuickStartBase {
+public class UpsertQuickStart extends RealtimeQuickStart {
@Override
public List<String> types() {
return Collections.singletonList("UPSERT");
@@ -48,58 +41,14 @@ public class UpsertQuickStart extends QuickStartBase {
PinotAdministrator.main(arguments.toArray(new String[arguments.size()]));
}
- public void execute()
+ @Override
+ public void runSampleQueries(QuickstartRunner runner)
throws Exception {
- File quickstartTmpDir = new File(_dataDir,
String.valueOf(System.currentTimeMillis()));
- File bootstrapTableDir = new File(quickstartTmpDir, "meetupRsvp");
- File dataDir = new File(bootstrapTableDir, "data");
- Preconditions.checkState(dataDir.mkdirs());
-
- File schemaFile = new File(bootstrapTableDir, "meetupRsvp_schema.json");
- File tableConfigFile = new File(bootstrapTableDir,
"meetupRsvp_realtime_table_config.json");
-
- ClassLoader classLoader = Quickstart.class.getClassLoader();
- URL resource =
classLoader.getResource("examples/stream/meetupRsvp/upsert_meetupRsvp_schema.json");
- com.google.common.base.Preconditions.checkNotNull(resource);
- FileUtils.copyURLToFile(resource, schemaFile);
- resource =
classLoader.getResource("examples/stream/meetupRsvp/upsert_meetupRsvp_realtime_table_config.json");
- com.google.common.base.Preconditions.checkNotNull(resource);
- FileUtils.copyURLToFile(resource, tableConfigFile);
-
- QuickstartTableRequest request = new
QuickstartTableRequest(bootstrapTableDir.getAbsolutePath());
- final QuickstartRunner runner
- = new QuickstartRunner(Lists.newArrayList(request), 1, 1, 1, 1,
dataDir, getConfigOverrides());
-
- startKafka();
- _kafkaStarter.createTopic("meetupRSVPEvents",
KafkaStarterUtils.getTopicCreationProps(2));
- printStatus(Color.CYAN, "***** Starting meetup data stream and publishing
to Kafka *****");
- MeetupRsvpStream meetupRSVPProvider = new MeetupRsvpStream(true);
- meetupRSVPProvider.run();
- printStatus(Color.CYAN, "***** Starting Zookeeper, controller, server and
broker *****");
- runner.startAll();
- Runtime.getRuntime().addShutdownHook(new Thread(() -> {
- try {
- printStatus(Color.GREEN, "***** Shutting down realtime quick start
*****");
- runner.stop();
- meetupRSVPProvider.stopPublishing();
- FileUtils.deleteDirectory(quickstartTmpDir);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }));
- printStatus(Color.CYAN, "***** Bootstrap meetupRSVP(upsert) table *****");
- runner.bootstrapTable();
- printStatus(Color.CYAN, "***** Waiting for 5 seconds for a few events to
get populated *****");
- Thread.sleep(5000);
-
printStatus(Color.YELLOW, "***** Upsert quickstart setup complete *****");
-
- String q1 = "select event_id, count(*) from meetupRsvp group by event_id
limit 10";
+ String q1 = "select event_id, count(*) from upsertMeetupRsvp group by
event_id limit 10";
printStatus(Color.YELLOW, "Total number of documents per event_id in the
table");
printStatus(Color.CYAN, "Query : " + q1);
printStatus(Color.YELLOW, prettyPrintResponse(runner.runQuery(q1)));
printStatus(Color.GREEN,
"***************************************************");
-
- printStatus(Color.GREEN, "You can always go to http://localhost:9000 to
play around in the query console");
}
}
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/RsvpSourceGenerator.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/RsvpSourceGenerator.java
index cf04a9acbe..a1474d18f5 100644
---
a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/RsvpSourceGenerator.java
+++
b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/RsvpSourceGenerator.java
@@ -49,7 +49,7 @@ public class RsvpSourceGenerator implements
PinotSourceDataGenerator {
}
public RSVP createMessage() {
- String eventId = Math.abs(ThreadLocalRandom.current().nextLong()) + "";
+ String eventId = Math.abs(ThreadLocalRandom.current().nextLong(100)) + "";
ObjectNode json = JsonUtils.newObjectNode();
ObjectNode eventJson = JsonUtils.newObjectNode();
json.set("event", eventJson);
@@ -104,10 +104,10 @@ public class RsvpSourceGenerator implements
PinotSourceDataGenerator {
json.put("mtime", DateTime.now().getMillis());
- json.put("rsvp_id", ThreadLocalRandom.current().nextLong());
+ json.put("rsvp_id", ThreadLocalRandom.current().nextLong(100));
json.put("guests", ThreadLocalRandom.current().nextInt(100));
- json.put("rsvp_count", 1);
+ json.put("rsvp_count", ThreadLocalRandom.current().nextInt(10) + 1);
return new RSVP(eventId, eventId, json);
}
diff --git
a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/complexTypeHandling_meetupRsvp_realtime_table_config.json
b/pinot-tools/src/main/resources/examples/stream/meetupRsvp/complexTypeHandling_meetupRsvp_realtime_table_config.json
deleted file mode 100644
index b23f595fba..0000000000
---
a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/complexTypeHandling_meetupRsvp_realtime_table_config.json
+++ /dev/null
@@ -1,42 +0,0 @@
-{
- "tableName": "meetupRsvp",
- "tableType": "REALTIME",
- "tenants": {},
- "segmentsConfig": {
- "timeColumnName": "mtime",
- "timeType": "MILLISECONDS",
- "segmentPushType": "APPEND",
- "replicasPerPartition": "1",
- "retentionTimeUnit": "DAYS",
- "retentionTimeValue": "1"
- },
- "ingestionConfig": {
- "streamIngestionConfig": {
- "streamConfigMaps": [
- {
- "streamType": "kafka",
- "stream.kafka.consumer.type": "lowLevel",
- "stream.kafka.topic.name": "meetupRSVPEvents",
- "stream.kafka.decoder.class.name":
"org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
- "stream.kafka.hlc.zk.connect.string": "localhost:2191/kafka",
- "stream.kafka.consumer.factory.class.name":
"org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
- "stream.kafka.zk.broker.url": "localhost:2191/kafka",
- "stream.kafka.broker.list": "localhost:19092"
- }
- ]
- },
- "transformConfigs": [
- ],
- "complexTypeConfig": {
- "fieldsToUnnest": [
- "group.group_topics"
- ]
- }
- },
- "tableIndexConfig": {
- "loadMode": "MMAP"
- },
- "metadata": {
- "customConfigs": {}
- }
-}
diff --git
a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/complexTypeHandling_meetupRsvp_schema.json
b/pinot-tools/src/main/resources/examples/stream/meetupRsvp/complexTypeHandling_meetupRsvp_schema.json
deleted file mode 100644
index 7259f30204..0000000000
---
a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/complexTypeHandling_meetupRsvp_schema.json
+++ /dev/null
@@ -1,63 +0,0 @@
-{
- "schemaName": "meetupRsvp",
- "dimensionFieldSpecs": [
- {
- "name": "group.group_topics.urlkey",
- "dataType": "STRING"
- },
- {
- "name": "group.group_topics.topic_name",
- "dataType": "STRING"
- },
- {
- "name": "group.group_city",
- "dataType": "STRING"
- },
- {
- "name": "group.group_country",
- "dataType": "STRING"
- },
- {
- "name": "group.group_id",
- "dataType": "INT"
- },
- {
- "name": "group.group_name",
- "dataType": "STRING"
- },
- {
- "name": "group.group_lon",
- "dataType": "DOUBLE"
- },
- {
- "name": "group.group_urlname",
- "dataType": "STRING"
- },
- {
- "name": "group.group_state",
- "dataType": "STRING"
- },
- {
- "name": "group.group_lat",
- "dataType": "DOUBLE"
- },
- {
- "name": "rsvp_id",
- "dataType": "LONG"
- }
- ],
- "metricFieldSpecs": [
- {
- "name": "guests",
- "dataType": "INT"
- }
- ],
- "dateTimeFieldSpecs": [
- {
- "name": "mtime",
- "dataType": "LONG",
- "format": "1:MILLISECONDS:EPOCH",
- "granularity": "1:MILLISECONDS"
- }
- ]
-}
diff --git
a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/upsert_json_meetupRsvp_realtime_table_config.json
b/pinot-tools/src/main/resources/examples/stream/upsertJsonMeetupRsvp/upsertJsonMeetupRsvp_realtime_table_config.json
similarity index 95%
rename from
pinot-tools/src/main/resources/examples/stream/meetupRsvp/upsert_json_meetupRsvp_realtime_table_config.json
rename to
pinot-tools/src/main/resources/examples/stream/upsertJsonMeetupRsvp/upsertJsonMeetupRsvp_realtime_table_config.json
index 149587ee21..baa77af751 100644
---
a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/upsert_json_meetupRsvp_realtime_table_config.json
+++
b/pinot-tools/src/main/resources/examples/stream/upsertJsonMeetupRsvp/upsertJsonMeetupRsvp_realtime_table_config.json
@@ -1,5 +1,5 @@
{
- "tableName": "meetupRsvp",
+ "tableName": "upsertJsonMeetupRsvp",
"tableType": "REALTIME",
"tenants": {},
"segmentsConfig": {
@@ -16,7 +16,7 @@
{
"streamType": "kafka",
"stream.kafka.consumer.type": "lowLevel",
- "stream.kafka.topic.name": "meetupRSVPEvents",
+ "stream.kafka.topic.name": "upsertJsonMeetupRSVPEvents",
"stream.kafka.decoder.class.name":
"org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
"stream.kafka.hlc.zk.connect.string": "localhost:2191/kafka",
"stream.kafka.consumer.factory.class.name":
"org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
diff --git
a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/upsert_json_meetupRsvp_schema.json
b/pinot-tools/src/main/resources/examples/stream/upsertJsonMeetupRsvp/upsertJsonMeetupRsvp_schema.json
similarity index 95%
rename from
pinot-tools/src/main/resources/examples/stream/meetupRsvp/upsert_json_meetupRsvp_schema.json
rename to
pinot-tools/src/main/resources/examples/stream/upsertJsonMeetupRsvp/upsertJsonMeetupRsvp_schema.json
index 9112c73b3c..7085802479 100644
---
a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/upsert_json_meetupRsvp_schema.json
+++
b/pinot-tools/src/main/resources/examples/stream/upsertJsonMeetupRsvp/upsertJsonMeetupRsvp_schema.json
@@ -1,5 +1,5 @@
{
- "schemaName": "meetupRsvp",
+ "schemaName": "upsertJsonMeetupRsvp",
"dimensionFieldSpecs": [
{
"name": "event_json",
diff --git
a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/upsert_meetupRsvp_realtime_table_config.json
b/pinot-tools/src/main/resources/examples/stream/upsertMeetupRsvp/upsertMeetupRsvp_realtime_table_config.json
similarity index 94%
rename from
pinot-tools/src/main/resources/examples/stream/meetupRsvp/upsert_meetupRsvp_realtime_table_config.json
rename to
pinot-tools/src/main/resources/examples/stream/upsertMeetupRsvp/upsertMeetupRsvp_realtime_table_config.json
index aecdde0d28..d29431c683 100644
---
a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/upsert_meetupRsvp_realtime_table_config.json
+++
b/pinot-tools/src/main/resources/examples/stream/upsertMeetupRsvp/upsertMeetupRsvp_realtime_table_config.json
@@ -1,5 +1,5 @@
{
- "tableName": "meetupRsvp",
+ "tableName": "upsertMeetupRsvp",
"tableType": "REALTIME",
"segmentsConfig": {
"timeColumnName": "mtime",
@@ -21,7 +21,7 @@
"streamConfigs": {
"streamType": "kafka",
"stream.kafka.consumer.type": "lowLevel",
- "stream.kafka.topic.name": "meetupRSVPEvents",
+ "stream.kafka.topic.name": "upsertMeetupRSVPEvents",
"stream.kafka.decoder.class.name":
"org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
"stream.kafka.hlc.zk.connect.string": "localhost:2191/kafka",
"stream.kafka.consumer.factory.class.name":
"org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
diff --git
a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/upsert_meetupRsvp_schema.json
b/pinot-tools/src/main/resources/examples/stream/upsertMeetupRsvp/upsertMeetupRsvp_schema.json
similarity index 97%
rename from
pinot-tools/src/main/resources/examples/stream/meetupRsvp/upsert_meetupRsvp_schema.json
rename to
pinot-tools/src/main/resources/examples/stream/upsertMeetupRsvp/upsertMeetupRsvp_schema.json
index 4c88f5d945..bb69cc2093 100644
---
a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/upsert_meetupRsvp_schema.json
+++
b/pinot-tools/src/main/resources/examples/stream/upsertMeetupRsvp/upsertMeetupRsvp_schema.json
@@ -60,7 +60,7 @@
"granularity": "1:MILLISECONDS"
}
],
- "schemaName": "meetupRsvp",
+ "schemaName": "upsertMeetupRsvp",
"primaryKeyColumns": [
"event_id"
]
diff --git
a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/upsert_partial_meetupRsvp_realtime_table_config.json
b/pinot-tools/src/main/resources/examples/stream/upsertPartialMeetupRsvp/upsertPartialMeetupRsvp_realtime_table_config.json
similarity index 93%
rename from
pinot-tools/src/main/resources/examples/stream/meetupRsvp/upsert_partial_meetupRsvp_realtime_table_config.json
rename to
pinot-tools/src/main/resources/examples/stream/upsertPartialMeetupRsvp/upsertPartialMeetupRsvp_realtime_table_config.json
index 647c6f4080..2ebf3bb379 100644
---
a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/upsert_partial_meetupRsvp_realtime_table_config.json
+++
b/pinot-tools/src/main/resources/examples/stream/upsertPartialMeetupRsvp/upsertPartialMeetupRsvp_realtime_table_config.json
@@ -1,5 +1,5 @@
{
- "tableName": "meetupRsvp",
+ "tableName": "upsertPartialMeetupRsvp",
"tableType": "REALTIME",
"segmentsConfig": {
"timeColumnName": "mtime",
@@ -18,7 +18,7 @@
"streamConfigs": {
"streamType": "kafka",
"stream.kafka.consumer.type": "lowLevel",
- "stream.kafka.topic.name": "meetupRSVPEvents",
+ "stream.kafka.topic.name": "upsertPartialMeetupRSVPEvents",
"stream.kafka.decoder.class.name":
"org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
"stream.kafka.hlc.zk.connect.string": "localhost:2191/kafka",
"stream.kafka.consumer.factory.class.name":
"org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
diff --git
a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/upsert_partial_meetupRsvp_schema.json
b/pinot-tools/src/main/resources/examples/stream/upsertPartialMeetupRsvp/upsertPartialMeetupRsvp_schema.json
similarity index 96%
rename from
pinot-tools/src/main/resources/examples/stream/meetupRsvp/upsert_partial_meetupRsvp_schema.json
rename to
pinot-tools/src/main/resources/examples/stream/upsertPartialMeetupRsvp/upsertPartialMeetupRsvp_schema.json
index ba144ce7d5..b17d454b0d 100644
---
a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/upsert_partial_meetupRsvp_schema.json
+++
b/pinot-tools/src/main/resources/examples/stream/upsertPartialMeetupRsvp/upsertPartialMeetupRsvp_schema.json
@@ -62,7 +62,7 @@
"granularity": "1:MILLISECONDS"
}
],
- "schemaName": "meetupRsvp",
+ "schemaName": "upsertPartialMeetupRsvp",
"primaryKeyColumns": [
"event_id"
]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]