This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new d41ec0bfd4 Refactor stream quickstart (#9227)
d41ec0bfd4 is described below
commit d41ec0bfd4282ff2121ecb9b1b36af48c1890c1e
Author: Xiang Fu <[email protected]>
AuthorDate: Wed Aug 17 11:24:34 2022 -0700
Refactor stream quickstart (#9227)
---
.../kafka20/server/KafkaDataServerStartable.java | 31 ++++
.../org/apache/pinot/tools/GenericQuickstart.java | 29 ----
.../apache/pinot/tools/GitHubEventsQuickstart.java | 28 ++--
.../org/apache/pinot/tools/HybridQuickstart.java | 165 +++++++--------------
.../pinot/tools/PartialUpsertQuickStart.java | 18 +--
.../org/apache/pinot/tools/QuickStartBase.java | 164 ++++++++++++++++++++
.../apache/pinot/tools/QuickstartTableRequest.java | 21 +++
.../RealtimeComplexTypeHandlingQuickStart.java | 83 +----------
.../pinot/tools/RealtimeJsonIndexQuickStart.java | 92 ++----------
.../org/apache/pinot/tools/RealtimeQuickStart.java | 102 +++++--------
.../pinot/tools/RealtimeQuickStartWithMinion.java | 126 ++--------------
.../apache/pinot/tools/UpsertJsonQuickStart.java | 19 +--
.../org/apache/pinot/tools/UpsertQuickStart.java | 19 +--
.../pinot/tools/streams/AirlineDataStream.java | 13 +-
.../pinot/tools/streams/MeetupRsvpStream.java | 18 ++-
.../pinot/tools/streams/RsvpSourceGenerator.java | 63 +++++++-
.../airlineStats_realtime_table_config.json | 26 ++++
.../stream/airlineStats/airlineStats_schema.json | 12 ++
.../airlineStats_data.avro | Bin
.../airlineStats_data.json | 0
.../{sample_data => rawdata}/airlineStats_data.orc | Bin
.../meetupRsvp_realtime_table_config.json | 18 ++-
.../meetupRsvp_realtime_table_config.json | 18 ++-
...etupRsvpComplexType_realtime_table_config.json} | 41 ++---
.../meetupRsvpComplexType_schema.json | 63 ++++++++
.../meetupRsvpJson_realtime_table_config.json} | 4 +-
.../meetupRsvpJson_schema.json} | 2 +-
27 files changed, 575 insertions(+), 600 deletions(-)
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/server/KafkaDataServerStartable.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/server/KafkaDataServerStartable.java
index 42bbd9be8d..9bd13c43ae 100644
---
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/server/KafkaDataServerStartable.java
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/server/KafkaDataServerStartable.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.plugin.stream.kafka20.server;
+import com.google.common.base.Function;
import java.io.File;
import java.util.Arrays;
import java.util.Collection;
@@ -25,6 +26,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
+import javax.annotation.Nullable;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import org.I0Itec.zkclient.ZkClient;
@@ -96,10 +98,39 @@ public class KafkaDataServerStartable implements
StreamDataServerStartable {
int partition = (Integer) props.get("partition");
Collection<NewTopic> topicList = Arrays.asList(new NewTopic(topic,
partition, (short) 1));
_adminClient.createTopics(topicList);
+ waitForCondition(new Function<Void, Boolean>() {
+ @Nullable
+ @Override
+ public Boolean apply(@Nullable Void aVoid) {
+ try {
+ return _adminClient.listTopics().names().get().contains(topic);
+ } catch (Exception e) {
+ LOGGER.warn("Could not fetch Kafka topics", e);
+ return null;
+ }
+ }
+ }, 1000L, 30000, "Kafka topic " + topic + " is not created yet");
}
@Override
public int getPort() {
return _port;
}
+
+ private static void waitForCondition(Function<Void, Boolean> condition, long
checkIntervalMs, long timeoutMs,
+ @Nullable String errorMessage) {
+ long endTime = System.currentTimeMillis() + timeoutMs;
+ String errorMessageSuffix = errorMessage != null ? ", error message: " +
errorMessage : "";
+ while (System.currentTimeMillis() < endTime) {
+ try {
+ if (Boolean.TRUE.equals(condition.apply(null))) {
+ return;
+ }
+ Thread.sleep(checkIntervalMs);
+ } catch (Exception e) {
+ LOGGER.error("Caught exception while checking the condition" +
errorMessageSuffix, e);
+ }
+ }
+ LOGGER.error("Failed to meet condition in " + timeoutMs + "ms" +
errorMessageSuffix);
+ }
}
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/GenericQuickstart.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/GenericQuickstart.java
index 62fc47d6b3..b4b8f295cd 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/GenericQuickstart.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/GenericQuickstart.java
@@ -20,11 +20,7 @@ package org.apache.pinot.tools;
import java.util.Arrays;
import java.util.List;
-import org.apache.pinot.common.utils.ZkStarter;
-import org.apache.pinot.spi.stream.StreamDataProvider;
-import org.apache.pinot.spi.stream.StreamDataServerStartable;
import org.apache.pinot.tools.admin.command.QuickstartRunner;
-import org.apache.pinot.tools.utils.KafkaStarterUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,24 +39,10 @@ import org.slf4j.LoggerFactory;
*/
public class GenericQuickstart extends Quickstart {
private static final Logger LOGGER =
LoggerFactory.getLogger(GenericQuickstart.class);
- private StreamDataServerStartable _kafkaStarter;
- private ZkStarter.ZookeeperInstance _zookeeperInstance;
public GenericQuickstart() {
}
- private void startKafka() {
- _zookeeperInstance = ZkStarter.startLocalZkServer();
- try {
- _kafkaStarter =
StreamDataProvider.getServerDataStartable(KafkaStarterUtils.KAFKA_SERVER_STARTABLE_CLASS_NAME,
- KafkaStarterUtils.getDefaultKafkaConfiguration(_zookeeperInstance));
- } catch (Exception e) {
- throw new RuntimeException("Failed to start " +
KafkaStarterUtils.KAFKA_SERVER_STARTABLE_CLASS_NAME, e);
- }
- _kafkaStarter.start();
- _kafkaStarter.createTopic("pullRequestMergedEvents",
KafkaStarterUtils.getTopicCreationProps(2));
- }
-
@Override
public List<String> types() {
return Arrays.asList("GENERIC");
@@ -92,18 +74,7 @@ public class GenericQuickstart extends Quickstart {
public void execute()
throws Exception {
- printStatus(Color.CYAN, "***** Starting Kafka *****");
startKafka();
-
- Runtime.getRuntime().addShutdownHook(new Thread(() -> {
- try {
- printStatus(Color.GREEN, "***** Shutting down QuickStart cluster
*****");
- _kafkaStarter.stop();
- ZkStarter.stopLocalZkServer(_zookeeperInstance);
- } catch (Exception e) {
- LOGGER.error("Caught exception in shutting down QuickStart cluster",
e);
- }
- }));
super.execute();
}
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/GitHubEventsQuickstart.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/GitHubEventsQuickstart.java
index 47a1a9417f..8765333b96 100644
---
a/pinot-tools/src/main/java/org/apache/pinot/tools/GitHubEventsQuickstart.java
+++
b/pinot-tools/src/main/java/org/apache/pinot/tools/GitHubEventsQuickstart.java
@@ -26,20 +26,16 @@ import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import org.apache.commons.io.FileUtils;
-import org.apache.pinot.common.utils.ZkStarter;
import org.apache.pinot.spi.stream.StreamDataProvider;
import org.apache.pinot.spi.stream.StreamDataServerStartable;
import org.apache.pinot.tools.Quickstart.Color;
import org.apache.pinot.tools.admin.command.QuickstartRunner;
import
org.apache.pinot.tools.streams.githubevents.PullRequestMergedEventsStream;
-import org.apache.pinot.tools.utils.KafkaStarterUtils;
import org.apache.pinot.tools.utils.KinesisStarterUtils;
import org.apache.pinot.tools.utils.StreamSourceType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.apache.pinot.tools.Quickstart.prettyPrintResponse;
-
/**
* Sets up a demo Pinot cluster with 1 zookeeper, 1 controller, 1 broker and 1
server
@@ -50,25 +46,12 @@ import static
org.apache.pinot.tools.Quickstart.prettyPrintResponse;
public class GitHubEventsQuickstart extends QuickStartBase {
private static final Logger LOGGER =
LoggerFactory.getLogger(GitHubEventsQuickstart.class);
private StreamDataServerStartable _serverStarter;
- private ZkStarter.ZookeeperInstance _zookeeperInstance;
private String _personalAccessToken;
private StreamSourceType _sourceType;
public GitHubEventsQuickstart() {
}
- private void startKafka() {
- _zookeeperInstance = ZkStarter.startLocalZkServer();
- try {
- _serverStarter =
StreamDataProvider.getServerDataStartable(KafkaStarterUtils.KAFKA_SERVER_STARTABLE_CLASS_NAME,
- KafkaStarterUtils.getDefaultKafkaConfiguration(_zookeeperInstance));
- } catch (Exception e) {
- throw new RuntimeException("Failed to start " +
KafkaStarterUtils.KAFKA_SERVER_STARTABLE_CLASS_NAME, e);
- }
- _serverStarter.start();
- _serverStarter.createTopic("pullRequestMergedEvents",
KafkaStarterUtils.getTopicCreationProps(2));
- }
-
private void startKinesis() {
try {
@@ -85,6 +68,15 @@ public class GitHubEventsQuickstart extends QuickStartBase {
Properties topicProperties = new Properties();
topicProperties.put(KinesisStarterUtils.NUM_SHARDS, 3);
_serverStarter.createTopic("pullRequestMergedEvents", topicProperties);
+
+ Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+ try {
+ printStatus(Color.GREEN, "***** Shutting down Kinesis *****");
+ _serverStarter.stop();
+ } catch (Exception e) {
+ LOGGER.error("Caught exception in shutting down Kinesis", e);
+ }
+ }));
}
private void startStreamServer() {
@@ -148,8 +140,6 @@ public class GitHubEventsQuickstart extends QuickStartBase {
try {
printStatus(Color.GREEN, "***** Shutting down GitHubEventsQuickStart
*****");
runner.stop();
- _serverStarter.stop();
- ZkStarter.stopLocalZkServer(_zookeeperInstance);
FileUtils.deleteDirectory(quickStartDataDir);
} catch (Exception e) {
LOGGER.error("Caught exception in shutting down GitHubEvents
QuickStart", e);
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/HybridQuickstart.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/HybridQuickstart.java
index 3ef0945a75..c65fa0a347 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/HybridQuickstart.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/HybridQuickstart.java
@@ -19,45 +19,27 @@
package org.apache.pinot.tools;
import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
+import com.google.common.collect.ImmutableMap;
import java.io.File;
-import java.io.IOException;
-import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.apache.commons.io.FileUtils;
-import org.apache.pinot.common.utils.ZkStarter;
-import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.stream.StreamDataProvider;
-import org.apache.pinot.spi.stream.StreamDataServerStartable;
-import org.apache.pinot.spi.utils.JsonUtils;
-import org.apache.pinot.tools.Quickstart.Color;
import org.apache.pinot.tools.admin.PinotAdministrator;
import org.apache.pinot.tools.admin.command.QuickstartRunner;
-import org.apache.pinot.tools.streams.AirlineDataStream;
-import org.apache.pinot.tools.utils.KafkaStarterUtils;
-import static org.apache.pinot.tools.Quickstart.prettyPrintResponse;
-
-public class HybridQuickstart extends QuickStartBase {
+public class HybridQuickstart extends Quickstart {
@Override
public List<String> types() {
return Collections.singletonList("HYBRID");
}
- private StreamDataServerStartable _kafkaStarter;
- private ZkStarter.ZookeeperInstance _zookeeperInstance;
- private File _schemaFile;
- private File _realtimeTableConfigFile;
- private File _dataFile;
- private File _ingestionJobSpecFile;
-
public static void main(String[] args)
throws Exception {
List<String> arguments = new ArrayList<>();
@@ -73,97 +55,9 @@ public class HybridQuickstart extends QuickStartBase {
return overrides;
}
- private QuickstartTableRequest prepareTableRequest(File baseDir)
- throws IOException {
- _schemaFile = new File(baseDir, "airlineStats_schema.json");
- _ingestionJobSpecFile = new File(baseDir, "ingestionJobSpec.yaml");
- File tableConfigFile = new File(baseDir,
"airlineStats_offline_table_config.json");
-
- ClassLoader classLoader = Quickstart.class.getClassLoader();
- URL resource =
classLoader.getResource("examples/batch/airlineStats/airlineStats_schema.json");
- Preconditions.checkNotNull(resource);
- FileUtils.copyURLToFile(resource, _schemaFile);
- resource =
classLoader.getResource("examples/batch/airlineStats/ingestionJobSpec.yaml");
- Preconditions.checkNotNull(resource);
- FileUtils.copyURLToFile(resource, _ingestionJobSpecFile);
- resource =
classLoader.getResource("examples/batch/airlineStats/airlineStats_offline_table_config.json");
- Preconditions.checkNotNull(resource);
- FileUtils.copyURLToFile(resource, tableConfigFile);
-
- _realtimeTableConfigFile = new File(baseDir,
"airlineStats_realtime_table_config.json");
- resource = Quickstart.class.getClassLoader()
-
.getResource("examples/stream/airlineStats/airlineStats_realtime_table_config.json");
- Preconditions.checkNotNull(resource);
- FileUtils.copyURLToFile(resource, _realtimeTableConfigFile);
- resource = Quickstart.class.getClassLoader()
-
.getResource("examples/stream/airlineStats/sample_data/airlineStats_data.avro");
- Preconditions.checkNotNull(resource);
- _dataFile = new File(baseDir, "airlineStats_data.avro");
- FileUtils.copyURLToFile(resource, _dataFile);
-
- return new QuickstartTableRequest(baseDir.getAbsolutePath());
- }
-
- private void startKafka() {
- _zookeeperInstance = ZkStarter.startLocalZkServer();
- try {
- _kafkaStarter =
StreamDataProvider.getServerDataStartable(KafkaStarterUtils.KAFKA_SERVER_STARTABLE_CLASS_NAME,
- KafkaStarterUtils.getDefaultKafkaConfiguration(_zookeeperInstance));
- } catch (Exception e) {
- throw new RuntimeException("Failed to start " +
KafkaStarterUtils.KAFKA_SERVER_STARTABLE_CLASS_NAME, e);
- }
- _kafkaStarter.start();
- _kafkaStarter.createTopic("flights-realtime",
KafkaStarterUtils.getTopicCreationProps(10));
- }
-
- public void execute()
+ @Override
+ public void runSampleQueries(QuickstartRunner runner)
throws Exception {
- File quickstartTmpDir = new File(_dataDir,
String.valueOf(System.currentTimeMillis()));
- File baseDir = new File(quickstartTmpDir, "airlineStats");
- File dataDir = new File(baseDir, "data");
- Preconditions.checkState(dataDir.mkdirs());
- QuickstartTableRequest bootstrapTableRequest =
prepareTableRequest(baseDir);
- final QuickstartRunner runner = new
QuickstartRunner(Lists.newArrayList(bootstrapTableRequest),
- 1, 1, 1, 1, dataDir, getConfigOverrides());
- printStatus(Color.YELLOW, "***** Starting Kafka *****");
- startKafka();
- printStatus(Color.YELLOW, "***** Starting airline data stream and
publishing to Kafka *****");
- Schema schema = Schema.fromFile(_schemaFile);
- TableConfig tableConfig = JsonUtils.fileToObject(_realtimeTableConfigFile,
TableConfig.class);
- AirlineDataStream stream = new AirlineDataStream(schema, tableConfig,
_dataFile);
- stream.run();
- printStatus(Color.YELLOW, "***** Starting Zookeeper, 1 servers, 1 brokers
and 1 controller *****");
- runner.startAll();
- Runtime.getRuntime().addShutdownHook(new Thread(() -> {
- try {
- printStatus(Color.GREEN, "***** Shutting down hybrid quick start
*****");
- runner.stop();
- stream.shutdown();
- _kafkaStarter.stop();
- ZkStarter.stopLocalZkServer(_zookeeperInstance);
- FileUtils.deleteDirectory(quickstartTmpDir);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }));
- printStatus(Color.YELLOW, "***** Bootstrap airlineStats offline and
realtime table *****");
- runner.bootstrapTable();
-
- printStatus(Color.YELLOW, "***** Pinot Hybrid with hybrid table setup is
complete *****");
- printStatus(Color.YELLOW, "***** Sequence of operations *****");
- printStatus(Color.YELLOW, "***** 1. Started 1 controller instance where
tenant creation is enabled *****");
- printStatus(Color.YELLOW, "***** 2. Started 2 servers and 2 brokers
*****");
- printStatus(Color.YELLOW, "***** 3. Created a server tenant with 1
offline and 1 realtime instance *****");
- printStatus(Color.YELLOW, "***** 4. Created a broker tenant with 2
instances *****");
- printStatus(Color.YELLOW, "***** 5. Added a schema *****");
- printStatus(Color.YELLOW,
- "***** 6. Created an offline and a realtime table with the tenant
names created above *****");
- printStatus(Color.YELLOW, "***** 7. Built and pushed an offline segment
*****");
- printStatus(Color.YELLOW,
- "***** 8. Started publishing a Kafka stream for the realtime
instance to start consuming *****");
- printStatus(Color.YELLOW, "***** 9. Sleep 5 Seconds to wait for all
components brought up *****");
- Thread.sleep(5000);
-
String q1 = "select count(*) from airlineStats limit 1";
printStatus(Color.YELLOW, "Total number of documents in the table");
printStatus(Color.CYAN, "Query : " + q1);
@@ -198,6 +92,53 @@ public class HybridQuickstart extends QuickStartBase {
printStatus(Color.CYAN, "Query : " + q5);
printStatus(Color.YELLOW, prettyPrintResponse(runner.runQuery(q5)));
printStatus(Color.GREEN,
"***************************************************");
+ }
+
+ @Override
+ protected String[] getDefaultBatchTableDirectories() {
+ return new String[]{"examples/batch/airlineStats"};
+ }
+
+ @Override
+ protected Map<String, String> getDefaultStreamTableDirectories() {
+ return ImmutableMap.of("airlineStats", "examples/stream/airlineStats");
+ }
+
+ public void execute()
+ throws Exception {
+ File quickstartTmpDir = new File(_dataDir,
String.valueOf(System.currentTimeMillis()));
+ File quickstartRunnerDir = new File(quickstartTmpDir, "quickstart");
+ Preconditions.checkState(quickstartRunnerDir.mkdirs());
+ Set<QuickstartTableRequest> quickstartTableRequests = new HashSet<>();
+
quickstartTableRequests.addAll(bootstrapOfflineTableDirectories(quickstartTmpDir));
+
quickstartTableRequests.addAll(bootstrapStreamTableDirectories(quickstartTmpDir));
+ final QuickstartRunner runner =
+ new QuickstartRunner(new ArrayList<>(quickstartTableRequests), 1, 1,
1, 1, quickstartRunnerDir,
+ getConfigOverrides());
+
+ startKafka();
+ startAllDataStreams(_kafkaStarter, quickstartTmpDir);
+
+ printStatus(Color.CYAN, "***** Starting Zookeeper, controller, broker,
server and minion *****");
+ runner.startAll();
+ Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+ try {
+ printStatus(Color.GREEN, "***** Shutting down realtime quick start
*****");
+ runner.stop();
+ FileUtils.deleteDirectory(quickstartTmpDir);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }));
+
+ printStatus(Color.CYAN, "***** Bootstrap all tables *****");
+ runner.bootstrapTable();
+
+ printStatus(Color.CYAN, "***** Waiting for 5 seconds for a few events to
get populated *****");
+ Thread.sleep(5000);
+
+ printStatus(Color.YELLOW, "***** Realtime quickstart setup complete
*****");
+ runSampleQueries(runner);
printStatus(Color.GREEN, "You can always go to http://localhost:9000 to
play around in the query console");
}
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/PartialUpsertQuickStart.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/PartialUpsertQuickStart.java
index 19fd63e72b..a581d5c89a 100644
---
a/pinot-tools/src/main/java/org/apache/pinot/tools/PartialUpsertQuickStart.java
+++
b/pinot-tools/src/main/java/org/apache/pinot/tools/PartialUpsertQuickStart.java
@@ -25,20 +25,14 @@ import java.net.URL;
import java.util.Arrays;
import java.util.List;
import org.apache.commons.io.FileUtils;
-import org.apache.pinot.common.utils.ZkStarter;
import org.apache.pinot.spi.plugin.PluginManager;
-import org.apache.pinot.spi.stream.StreamDataProvider;
-import org.apache.pinot.spi.stream.StreamDataServerStartable;
import org.apache.pinot.tools.Quickstart.Color;
import org.apache.pinot.tools.admin.command.QuickstartRunner;
import org.apache.pinot.tools.streams.MeetupRsvpStream;
import org.apache.pinot.tools.utils.KafkaStarterUtils;
-import static org.apache.pinot.tools.Quickstart.prettyPrintResponse;
-
public class PartialUpsertQuickStart extends QuickStartBase {
- private StreamDataServerStartable _kafkaStarter;
public static void main(String[] args)
throws Exception {
@@ -75,15 +69,7 @@ public class PartialUpsertQuickStart extends QuickStartBase {
final QuickstartRunner runner =
new QuickstartRunner(Lists.newArrayList(request), 1, 1, 1, 1, dataDir,
getConfigOverrides());
- printStatus(Color.CYAN, "***** Starting Kafka *****");
- final ZkStarter.ZookeeperInstance zookeeperInstance =
ZkStarter.startLocalZkServer();
- try {
- _kafkaStarter =
StreamDataProvider.getServerDataStartable(KafkaStarterUtils.KAFKA_SERVER_STARTABLE_CLASS_NAME,
- KafkaStarterUtils.getDefaultKafkaConfiguration());
- } catch (Exception e) {
- throw new RuntimeException("Failed to start " +
KafkaStarterUtils.KAFKA_SERVER_STARTABLE_CLASS_NAME, e);
- }
- _kafkaStarter.start();
+ startKafka();
_kafkaStarter.createTopic("meetupRSVPEvents",
KafkaStarterUtils.getTopicCreationProps(2));
printStatus(Color.CYAN, "***** Starting meetup data stream and publishing
to Kafka *****");
MeetupRsvpStream meetupRSVPProvider = new MeetupRsvpStream(true);
@@ -95,8 +81,6 @@ public class PartialUpsertQuickStart extends QuickStartBase {
printStatus(Color.GREEN, "***** Shutting down realtime quick start
*****");
runner.stop();
meetupRSVPProvider.stopPublishing();
- _kafkaStarter.stop();
- ZkStarter.stopLocalZkServer(zookeeperInstance);
FileUtils.deleteDirectory(quickstartTmpDir);
} catch (Exception e) {
e.printStackTrace();
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/QuickStartBase.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/QuickStartBase.java
index 225aeb40fd..bab8c768cb 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/QuickStartBase.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/QuickStartBase.java
@@ -24,15 +24,25 @@ import com.google.common.collect.ImmutableMap;
import java.io.File;
import java.io.IOException;
import java.net.URL;
+import java.nio.charset.StandardCharsets;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.LineIterator;
import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.common.utils.ZkStarter;
+import org.apache.pinot.spi.stream.StreamDataProducer;
+import org.apache.pinot.spi.stream.StreamDataProvider;
+import org.apache.pinot.spi.stream.StreamDataServerStartable;
import org.apache.pinot.tools.admin.command.QuickstartRunner;
+import org.apache.pinot.tools.streams.AirlineDataStream;
+import org.apache.pinot.tools.streams.MeetupRsvpStream;
import org.apache.pinot.tools.utils.JarUtils;
+import org.apache.pinot.tools.utils.KafkaStarterUtils;
import org.apache.pinot.tools.utils.PinotConfigUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -63,10 +73,19 @@ public abstract class QuickStartBase {
"examples/batch/githubComplexTypeEvents"
};
+ protected static final Map<String, String> DEFAULT_STREAM_TABLE_DIRECTORIES
= ImmutableMap.of(
+ "airlineStats", "examples/stream/airlineStats",
+ "githubEvents", "examples/minions/stream/githubEvents",
+ "meetupRsvp", "examples/stream/meetupRsvp",
+ "meetupRsvpJson", "examples/stream/meetupRsvpJson",
+ "meetupRsvpComplexType", "examples/stream/meetupRsvpComplexType");
+
protected File _dataDir = FileUtils.getTempDirectory();
protected String[] _bootstrapDataDirs;
protected String _zkExternalAddress;
protected String _configFilePath;
+ protected StreamDataServerStartable _kafkaStarter;
+ protected ZkStarter.ZookeeperInstance _zookeeperInstance;
public QuickStartBase setDataDir(String dataDir) {
_dataDir = new File(dataDir);
@@ -185,6 +204,25 @@ public abstract class QuickStartBase {
}
}
+ protected List<QuickstartTableRequest> bootstrapStreamTableDirectories(File
quickstartTmpDir)
+ throws IOException {
+ List<QuickstartTableRequest> quickstartTableRequests = new ArrayList<>();
+ for (Map.Entry<String, String> entry :
getDefaultStreamTableDirectories().entrySet()) {
+ String tableName = entry.getKey();
+ String directory = entry.getValue();
+ File baseDir = new File(quickstartTmpDir, tableName);
+ File dataDir = new File(baseDir, "rawdata");
+ dataDir.mkdirs();
+ if (useDefaultBootstrapTableDir()) {
+ copyResourceTableToTmpDirectory(directory, tableName, baseDir,
dataDir, true);
+ } else {
+ copyFilesystemTableToTmpDirectory(directory, tableName, baseDir);
+ }
+ quickstartTableRequests.add(new
QuickstartTableRequest(baseDir.getAbsolutePath()));
+ }
+ return quickstartTableRequests;
+ }
+
private static void copyFilesystemTableToTmpDirectory(String sourcePath,
String tableName, File baseDir)
throws IOException {
File fileDb = new File(sourcePath);
@@ -246,4 +284,130 @@ public abstract class QuickStartBase {
}
return responseBuilder.toString();
}
+
+ protected Map<String, String> getDefaultStreamTableDirectories() {
+ return DEFAULT_STREAM_TABLE_DIRECTORIES;
+ }
+
+ protected static void publishStreamDataToKafka(String tableName, File
dataDir)
+ throws Exception {
+ switch (tableName) {
+ case "githubEvents":
+ publishGithubEventsToKafka("githubEvents", new File(dataDir,
"/rawdata/2021-07-21-few-hours.json"));
+ break;
+ default:
+ break;
+ }
+ }
+
+ protected static void publishGithubEventsToKafka(String topicName, File
dataFile)
+ throws Exception {
+ Properties properties = new Properties();
+ properties.put("metadata.broker.list",
KafkaStarterUtils.DEFAULT_KAFKA_BROKER);
+ properties.put("serializer.class", "kafka.serializer.DefaultEncoder");
+ properties.put("request.required.acks", "1");
+ StreamDataProducer producer =
+
StreamDataProvider.getStreamDataProducer(KafkaStarterUtils.KAFKA_PRODUCER_CLASS_NAME,
properties);
+ try {
+ LineIterator dataStream = FileUtils.lineIterator(dataFile);
+
+ while (dataStream.hasNext()) {
+ producer.produce(topicName,
dataStream.nextLine().getBytes(StandardCharsets.UTF_8));
+ }
+ } finally {
+ producer.close();
+ }
+ }
+
+ protected void startKafka() {
+ printStatus(Quickstart.Color.CYAN, "***** Starting Kafka *****");
+ _zookeeperInstance = ZkStarter.startLocalZkServer();
+ try {
+ _kafkaStarter =
StreamDataProvider.getServerDataStartable(KafkaStarterUtils.KAFKA_SERVER_STARTABLE_CLASS_NAME,
+ KafkaStarterUtils.getDefaultKafkaConfiguration(_zookeeperInstance));
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to start " +
KafkaStarterUtils.KAFKA_SERVER_STARTABLE_CLASS_NAME, e);
+ }
+ _kafkaStarter.start();
+
+ Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+ try {
+ printStatus(Quickstart.Color.GREEN, "***** Shutting down kafka and
zookeeper *****");
+ _kafkaStarter.stop();
+ ZkStarter.stopLocalZkServer(_zookeeperInstance);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }));
+
+ printStatus(Quickstart.Color.CYAN, "***** Kafka Started *****");
+ }
+
+ public void startAllDataStreams(StreamDataServerStartable kafkaStarter, File
quickstartTmpDir)
+ throws Exception {
+ for (String streamName : getDefaultStreamTableDirectories().keySet()) {
+ switch (streamName) {
+ case "airlineStats":
+ kafkaStarter.createTopic("flights-realtime",
KafkaStarterUtils.getTopicCreationProps(10));
+ printStatus(Quickstart.Color.CYAN, "***** Starting airlineStats data
stream and publishing to Kafka *****");
+ AirlineDataStream airlineDataStream = new AirlineDataStream(new
File(quickstartTmpDir, "airlineStats"));
+ airlineDataStream.run();
+ Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+ try {
+ airlineDataStream.shutdown();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }));
+ break;
+ case "meetupRsvp":
+ kafkaStarter.createTopic("meetupRSVPEvents",
KafkaStarterUtils.getTopicCreationProps(10));
+ printStatus(Quickstart.Color.CYAN, "***** Starting meetup data
stream and publishing to Kafka *****");
+ MeetupRsvpStream meetupRSVPProvider = new MeetupRsvpStream();
+ meetupRSVPProvider.run();
+ Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+ try {
+ meetupRSVPProvider.stopPublishing();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }));
+ break;
+ case "meetupRsvpJson":
+ kafkaStarter.createTopic("meetupRSVPJsonEvents",
KafkaStarterUtils.getTopicCreationProps(10));
+ printStatus(Quickstart.Color.CYAN, "***** Starting meetupRsvpJson
data stream and publishing to Kafka *****");
+ MeetupRsvpStream meetupRSVPJsonProvider = new
MeetupRsvpStream("meetupRSVPJsonEvents");
+ meetupRSVPJsonProvider.run();
+ Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+ try {
+ meetupRSVPJsonProvider.stopPublishing();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }));
+ break;
+ case "meetupRsvpComplexType":
+ kafkaStarter.createTopic("meetupRSVPComplexTypeEvents",
KafkaStarterUtils.getTopicCreationProps(10));
+ printStatus(Quickstart.Color.CYAN,
+ "***** Starting meetupRSVPComplexType data stream and publishing
to Kafka *****");
+ MeetupRsvpStream meetupRSVPComplexTypeProvider = new
MeetupRsvpStream("meetupRSVPComplexTypeEvents");
+ meetupRSVPComplexTypeProvider.run();
+ Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+ try {
+ meetupRSVPComplexTypeProvider.stopPublishing();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }));
+ break;
+ case "githubEvents":
+ kafkaStarter.createTopic("githubEvents",
KafkaStarterUtils.getTopicCreationProps(2));
+ printStatus(Quickstart.Color.CYAN, "***** Starting githubEvents data
stream and publishing to Kafka *****");
+ publishStreamDataToKafka("githubEvents", new File(quickstartTmpDir,
"githubEvents"));
+ break;
+ default:
+ throw new UnsupportedOperationException("Unknown stream name: " +
streamName);
+ }
+ }
+ }
}
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/QuickstartTableRequest.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/QuickstartTableRequest.java
index f635dc098b..f0b66ef1df 100644
---
a/pinot-tools/src/main/java/org/apache/pinot/tools/QuickstartTableRequest.java
+++
b/pinot-tools/src/main/java/org/apache/pinot/tools/QuickstartTableRequest.java
@@ -19,6 +19,7 @@
package org.apache.pinot.tools;
import java.io.File;
+import java.util.Objects;
import org.apache.pinot.spi.config.table.TableType;
@@ -103,4 +104,24 @@ public class QuickstartTableRequest {
+ ", tableRequestFile = " + _tableRequestFile + ", ingestionJobFile =
" + _ingestionJobFile
+ ", bootstrapTableDir = " + _bootstrapTableDir + " }";
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ QuickstartTableRequest that = (QuickstartTableRequest) o;
+ return Objects.equals(_tableName, that._tableName) && _tableType ==
that._tableType
+ && Objects.equals(_schemaFile, that._schemaFile) &&
Objects.equals(_tableRequestFile,
+ that._tableRequestFile) && Objects.equals(_ingestionJobFile,
that._ingestionJobFile)
+ && Objects.equals(_bootstrapTableDir, that._bootstrapTableDir);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(_tableName, _tableType, _schemaFile,
_tableRequestFile, _ingestionJobFile, _bootstrapTableDir);
+ }
}
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeComplexTypeHandlingQuickStart.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeComplexTypeHandlingQuickStart.java
index 2d9e6f7d09..0027008c01 100644
---
a/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeComplexTypeHandlingQuickStart.java
+++
b/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeComplexTypeHandlingQuickStart.java
@@ -18,33 +18,20 @@
*/
package org.apache.pinot.tools;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import java.io.File;
-import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
-import org.apache.commons.io.FileUtils;
-import org.apache.pinot.common.utils.ZkStarter;
-import org.apache.pinot.spi.stream.StreamDataProvider;
-import org.apache.pinot.spi.stream.StreamDataServerStartable;
import org.apache.pinot.tools.Quickstart.Color;
import org.apache.pinot.tools.admin.PinotAdministrator;
import org.apache.pinot.tools.admin.command.QuickstartRunner;
-import org.apache.pinot.tools.streams.MeetupRsvpStream;
-import org.apache.pinot.tools.utils.KafkaStarterUtils;
-import static org.apache.pinot.tools.Quickstart.prettyPrintResponse;
-
-public class RealtimeComplexTypeHandlingQuickStart extends QuickStartBase {
+public class RealtimeComplexTypeHandlingQuickStart extends RealtimeQuickStart {
@Override
public List<String> types() {
- return Arrays.asList("REALTIME_COMPLEX_TYPE", "REALTIME-COMPLEX-TYPE",
- "STREAM_COMPLEX_TYPE", "STREAM-COMPLEX-TYPE");
+ return Arrays.asList("REALTIME_COMPLEX_TYPE", "REALTIME-COMPLEX-TYPE",
+ "STREAM_COMPLEX_TYPE", "STREAM-COMPLEX-TYPE");
}
- private StreamDataServerStartable _kafkaStarter;
public static void main(String[] args)
throws Exception {
@@ -54,71 +41,15 @@ public class RealtimeComplexTypeHandlingQuickStart extends
QuickStartBase {
PinotAdministrator.main(arguments.toArray(new String[arguments.size()]));
}
- public void execute()
+ @Override
+ public void runSampleQueries(QuickstartRunner runner)
throws Exception {
- File quickstartTmpDir = new File(_dataDir,
String.valueOf(System.currentTimeMillis()));
- File baseDir = new File(quickstartTmpDir, "meetupRsvp");
- File dataDir = new File(baseDir, "data");
- Preconditions.checkState(dataDir.mkdirs());
-
- File schemaFile = new File(baseDir, "meetupRsvp_schema.json");
- File tableConfigFile = new File(baseDir,
"meetupRsvp_realtime_table_config.json");
-
- ClassLoader classLoader = Quickstart.class.getClassLoader();
- URL resource =
classLoader.getResource("examples/stream/meetupRsvp/complexTypeHandling_meetupRsvp_schema.json");
- Preconditions.checkNotNull(resource);
- FileUtils.copyURLToFile(resource, schemaFile);
- resource =
-
classLoader.getResource("examples/stream/meetupRsvp/complexTypeHandling_meetupRsvp_realtime_table_config.json");
- Preconditions.checkNotNull(resource);
- FileUtils.copyURLToFile(resource, tableConfigFile);
-
- QuickstartTableRequest request = new
QuickstartTableRequest(baseDir.getAbsolutePath());
- QuickstartRunner runner =
- new QuickstartRunner(Lists.newArrayList(request), 1, 1, 1, 1, dataDir,
getConfigOverrides());
-
- printStatus(Color.CYAN, "***** Starting Kafka *****");
- ZkStarter.ZookeeperInstance zookeeperInstance =
ZkStarter.startLocalZkServer();
- try {
- _kafkaStarter =
StreamDataProvider.getServerDataStartable(KafkaStarterUtils.KAFKA_SERVER_STARTABLE_CLASS_NAME,
- KafkaStarterUtils.getDefaultKafkaConfiguration());
- } catch (Exception e) {
- throw new RuntimeException("Failed to start " +
KafkaStarterUtils.KAFKA_SERVER_STARTABLE_CLASS_NAME, e);
- }
- _kafkaStarter.start();
- _kafkaStarter.createTopic("meetupRSVPEvents",
KafkaStarterUtils.getTopicCreationProps(2));
- printStatus(Color.CYAN, "***** Starting meetup data stream and publishing
to Kafka *****");
- MeetupRsvpStream meetupRSVPProvider = new MeetupRsvpStream();
- meetupRSVPProvider.run();
- printStatus(Color.CYAN, "***** Starting Zookeeper, controller, server and
broker *****");
- runner.startAll();
- Runtime.getRuntime().addShutdownHook(new Thread(() -> {
- try {
- printStatus(Color.GREEN, "***** Shutting down realtime quick start
*****");
- runner.stop();
- meetupRSVPProvider.stopPublishing();
- _kafkaStarter.stop();
- ZkStarter.stopLocalZkServer(zookeeperInstance);
- FileUtils.deleteDirectory(quickstartTmpDir);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }));
- printStatus(Color.CYAN, "***** Bootstrap meetupRSVP table *****");
- runner.bootstrapTable();
- printStatus(Color.CYAN, "***** Waiting for 20 seconds for a few events to
get populated *****");
- Thread.sleep(20000);
-
- printStatus(Color.YELLOW, "***** Realtime json-index quickstart setup
complete *****");
-
String q1 =
- "select \"group.group_topics.urlkey\",
\"group.group_topics.topic_name\", \"group.group_id\" from meetupRsvp "
- + "limit 10";
+ "select \"group.group_topics.urlkey\",
\"group.group_topics.topic_name\", \"group.group_id\" from "
+ + "meetupRsvpComplexType limit 10";
printStatus(Color.YELLOW, "Events related to fitness");
printStatus(Color.CYAN, "Query : " + q1);
printStatus(Color.YELLOW, prettyPrintResponse(runner.runQuery(q1)));
printStatus(Color.GREEN,
"***************************************************");
-
- printStatus(Color.GREEN, "You can always go to http://localhost:9000 to
play around in the query console");
}
}
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeJsonIndexQuickStart.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeJsonIndexQuickStart.java
index e77d611fb0..75d643497e 100644
---
a/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeJsonIndexQuickStart.java
+++
b/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeJsonIndexQuickStart.java
@@ -18,33 +18,31 @@
*/
package org.apache.pinot.tools;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import java.io.File;
-import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
-import org.apache.commons.io.FileUtils;
-import org.apache.pinot.common.utils.ZkStarter;
-import org.apache.pinot.spi.stream.StreamDataProvider;
-import org.apache.pinot.spi.stream.StreamDataServerStartable;
import org.apache.pinot.tools.Quickstart.Color;
import org.apache.pinot.tools.admin.PinotAdministrator;
import org.apache.pinot.tools.admin.command.QuickstartRunner;
-import org.apache.pinot.tools.streams.MeetupRsvpStream;
-import org.apache.pinot.tools.utils.KafkaStarterUtils;
-import static org.apache.pinot.tools.Quickstart.prettyPrintResponse;
-
-public class RealtimeJsonIndexQuickStart extends QuickStartBase {
+public class RealtimeJsonIndexQuickStart extends RealtimeQuickStart {
@Override
public List<String> types() {
return Arrays.asList("REALTIME_JSON_INDEX", "REALTIME-JSON-INDEX",
"STREAM_JSON_INDEX", "STREAM-JSON-INDEX");
}
- private StreamDataServerStartable _kafkaStarter;
+ @Override
+ public void runSampleQueries(QuickstartRunner runner)
+ throws Exception {
+ String q1 = "select json_extract_scalar(event_json, '$.event_name',
'STRING') from meetupRsvpJson where json_match"
+ + "(group_json, '\"$.group_topics[*].topic_name\"=''topic_name0''')
limit 10";
+ printStatus(Color.YELLOW, "Events related to fitness");
+ printStatus(Color.CYAN, "Query : " + q1);
+ printStatus(Color.YELLOW, prettyPrintResponse(runner.runQuery(q1)));
+
+ printStatus(Color.GREEN,
"***************************************************");
+ }
public static void main(String[] args)
throws Exception {
@@ -53,70 +51,4 @@ public class RealtimeJsonIndexQuickStart extends
QuickStartBase {
arguments.addAll(Arrays.asList(args));
PinotAdministrator.main(arguments.toArray(new String[arguments.size()]));
}
-
- public void execute()
- throws Exception {
- File quickstartTmpDir = new File(_dataDir,
String.valueOf(System.currentTimeMillis()));
- File baseDir = new File(quickstartTmpDir, "meetupRsvp");
- File dataDir = new File(baseDir, "data");
- Preconditions.checkState(dataDir.mkdirs());
-
- File schemaFile = new File(baseDir, "meetupRsvp_schema.json");
- File tableConfigFile = new File(baseDir,
"meetupRsvp_realtime_table_config.json");
-
- ClassLoader classLoader = Quickstart.class.getClassLoader();
- URL resource =
classLoader.getResource("examples/stream/meetupRsvp/json_meetupRsvp_schema.json");
- Preconditions.checkNotNull(resource);
- FileUtils.copyURLToFile(resource, schemaFile);
- resource =
classLoader.getResource("examples/stream/meetupRsvp/json_meetupRsvp_realtime_table_config.json");
- Preconditions.checkNotNull(resource);
- FileUtils.copyURLToFile(resource, tableConfigFile);
-
- QuickstartTableRequest request = new
QuickstartTableRequest(baseDir.getAbsolutePath());
- QuickstartRunner runner =
- new QuickstartRunner(Lists.newArrayList(request), 1, 1, 1, 1, dataDir,
getConfigOverrides());
-
- printStatus(Color.CYAN, "***** Starting Kafka *****");
- ZkStarter.ZookeeperInstance zookeeperInstance =
ZkStarter.startLocalZkServer();
- try {
- _kafkaStarter =
StreamDataProvider.getServerDataStartable(KafkaStarterUtils.KAFKA_SERVER_STARTABLE_CLASS_NAME,
- KafkaStarterUtils.getDefaultKafkaConfiguration(zookeeperInstance));
- } catch (Exception e) {
- throw new RuntimeException("Failed to start " +
KafkaStarterUtils.KAFKA_SERVER_STARTABLE_CLASS_NAME, e);
- }
- _kafkaStarter.start();
- _kafkaStarter.createTopic("meetupRSVPEvents",
KafkaStarterUtils.getTopicCreationProps(2));
- printStatus(Color.CYAN, "***** Starting meetup data stream and publishing
to Kafka *****");
- MeetupRsvpStream meetupRSVPProvider = new MeetupRsvpStream();
- meetupRSVPProvider.run();
- printStatus(Color.CYAN, "***** Starting Zookeeper, controller, server and
broker *****");
- runner.startAll();
- Runtime.getRuntime().addShutdownHook(new Thread(() -> {
- try {
- printStatus(Color.GREEN, "***** Shutting down realtime quick start
*****");
- runner.stop();
- meetupRSVPProvider.stopPublishing();
- _kafkaStarter.stop();
- ZkStarter.stopLocalZkServer(zookeeperInstance);
- FileUtils.deleteDirectory(quickstartTmpDir);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }));
- printStatus(Color.CYAN, "***** Bootstrap meetupRSVP table *****");
- runner.bootstrapTable();
- printStatus(Color.CYAN, "***** Waiting for 20 seconds for a few events to
get populated *****");
- Thread.sleep(20000);
-
- printStatus(Color.YELLOW, "***** Realtime json-index quickstart setup
complete *****");
-
- String q1 = "select json_extract_scalar(event_json, '$.event_name',
'STRING') from meetupRsvp where json_match"
- + "(group_json, '\"$.group_topics[*].topic_name\"=''Fitness''') limit
10";
- printStatus(Color.YELLOW, "Events related to fitness");
- printStatus(Color.CYAN, "Query : " + q1);
- printStatus(Color.YELLOW, prettyPrintResponse(runner.runQuery(q1)));
-
- printStatus(Color.GREEN,
"***************************************************");
- printStatus(Color.GREEN, "You can always go to http://localhost:9000 to
play around in the query console");
- }
}
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeQuickStart.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeQuickStart.java
index 626961c1ee..be8db25933 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeQuickStart.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeQuickStart.java
@@ -19,23 +19,14 @@
package org.apache.pinot.tools;
import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
import java.io.File;
-import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.commons.io.FileUtils;
-import org.apache.pinot.common.utils.ZkStarter;
-import org.apache.pinot.spi.stream.StreamDataProvider;
-import org.apache.pinot.spi.stream.StreamDataServerStartable;
import org.apache.pinot.tools.Quickstart.Color;
import org.apache.pinot.tools.admin.PinotAdministrator;
import org.apache.pinot.tools.admin.command.QuickstartRunner;
-import org.apache.pinot.tools.streams.MeetupRsvpStream;
-import org.apache.pinot.tools.utils.KafkaStarterUtils;
-
-import static org.apache.pinot.tools.Quickstart.prettyPrintResponse;
public class RealtimeQuickStart extends QuickStartBase {
@@ -44,8 +35,6 @@ public class RealtimeQuickStart extends QuickStartBase {
return Arrays.asList("REALTIME", "STREAM");
}
- private StreamDataServerStartable _kafkaStarter;
-
public static void main(String[] args)
throws Exception {
List<String> arguments = new ArrayList<>();
@@ -54,62 +43,9 @@ public class RealtimeQuickStart extends QuickStartBase {
PinotAdministrator.main(arguments.toArray(new String[arguments.size()]));
}
- public void execute()
+ @Override
+ public void runSampleQueries(QuickstartRunner runner)
throws Exception {
- File quickstartTmpDir = new File(_dataDir,
String.valueOf(System.currentTimeMillis()));
- File baseDir = new File(quickstartTmpDir, "meetupRsvp");
- File dataDir = new File(baseDir, "rawdata");
- Preconditions.checkState(dataDir.mkdirs());
-
- File schemaFile = new File(baseDir, "meetupRsvp_schema.json");
- File tableConfigFile = new File(baseDir,
"meetupRsvp_realtime_table_config.json");
-
- ClassLoader classLoader = Quickstart.class.getClassLoader();
- URL resource =
classLoader.getResource("examples/stream/meetupRsvp/meetupRsvp_schema.json");
- com.google.common.base.Preconditions.checkNotNull(resource);
- FileUtils.copyURLToFile(resource, schemaFile);
- resource =
classLoader.getResource("examples/stream/meetupRsvp/meetupRsvp_realtime_table_config.json");
- com.google.common.base.Preconditions.checkNotNull(resource);
- FileUtils.copyURLToFile(resource, tableConfigFile);
-
- QuickstartTableRequest request = new
QuickstartTableRequest(baseDir.getAbsolutePath());
- final QuickstartRunner runner =
- new QuickstartRunner(Lists.newArrayList(request), 1, 1, 1, 1, dataDir,
getConfigOverrides());
-
- printStatus(Color.CYAN, "***** Starting Kafka *****");
- final ZkStarter.ZookeeperInstance zookeeperInstance =
ZkStarter.startLocalZkServer();
- try {
- _kafkaStarter =
StreamDataProvider.getServerDataStartable(KafkaStarterUtils.KAFKA_SERVER_STARTABLE_CLASS_NAME,
- KafkaStarterUtils.getDefaultKafkaConfiguration(zookeeperInstance));
- } catch (Exception e) {
- throw new RuntimeException("Failed to start " +
KafkaStarterUtils.KAFKA_SERVER_STARTABLE_CLASS_NAME, e);
- }
- _kafkaStarter.start();
- _kafkaStarter.createTopic("meetupRSVPEvents",
KafkaStarterUtils.getTopicCreationProps(10));
- printStatus(Color.CYAN, "***** Starting meetup data stream and publishing
to Kafka *****");
- MeetupRsvpStream meetupRSVPProvider = new MeetupRsvpStream();
- meetupRSVPProvider.run();
- printStatus(Color.CYAN, "***** Starting Zookeeper, controller, server and
broker *****");
- runner.startAll();
- Runtime.getRuntime().addShutdownHook(new Thread(() -> {
- try {
- printStatus(Color.GREEN, "***** Shutting down realtime quick start
*****");
- runner.stop();
- meetupRSVPProvider.stopPublishing();
- _kafkaStarter.stop();
- ZkStarter.stopLocalZkServer(zookeeperInstance);
- FileUtils.deleteDirectory(quickstartTmpDir);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }));
- printStatus(Color.CYAN, "***** Bootstrap meetupRSVP table *****");
- runner.bootstrapTable();
- printStatus(Color.CYAN, "***** Waiting for 5 seconds for a few events to
get populated *****");
- Thread.sleep(5000);
-
- printStatus(Color.YELLOW, "***** Realtime quickstart setup complete
*****");
-
String q1 = "select count(*) from meetupRsvp limit 1";
printStatus(Color.YELLOW, "Total number of documents in the table");
printStatus(Color.CYAN, "Query : " + q1);
@@ -141,6 +77,40 @@ public class RealtimeQuickStart extends QuickStartBase {
printStatus(Color.CYAN, "Query : " + q5);
printStatus(Color.YELLOW, prettyPrintResponse(runner.runQuery(q5)));
printStatus(Color.GREEN,
"***************************************************");
+ }
+
+ public void execute()
+ throws Exception {
+ File quickstartTmpDir = new File(_dataDir,
String.valueOf(System.currentTimeMillis()));
+ File quickstartRunnerDir = new File(quickstartTmpDir, "quickstart");
+ Preconditions.checkState(quickstartRunnerDir.mkdirs());
+ List<QuickstartTableRequest> quickstartTableRequests =
bootstrapStreamTableDirectories(quickstartTmpDir);
+ final QuickstartRunner runner =
+ new QuickstartRunner(quickstartTableRequests, 1, 1, 1, 1,
quickstartRunnerDir, getConfigOverrides());
+
+ startKafka();
+ startAllDataStreams(_kafkaStarter, quickstartTmpDir);
+
+ printStatus(Color.CYAN, "***** Starting Zookeeper, controller, broker,
server and minion *****");
+ runner.startAll();
+ Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+ try {
+ printStatus(Color.GREEN, "***** Shutting down realtime quick start
*****");
+ runner.stop();
+ FileUtils.deleteDirectory(quickstartTmpDir);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }));
+
+ printStatus(Color.CYAN, "***** Bootstrap all tables *****");
+ runner.bootstrapTable();
+
+ printStatus(Color.CYAN, "***** Waiting for 5 seconds for a few events to
get populated *****");
+ Thread.sleep(5000);
+
+ printStatus(Color.YELLOW, "***** Realtime quickstart setup complete
*****");
+ runSampleQueries(runner);
printStatus(Color.GREEN, "You can always go to http://localhost:9000 to
play around in the query console");
}
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeQuickStartWithMinion.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeQuickStartWithMinion.java
index 60786d82a3..b51baf6179 100644
---
a/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeQuickStartWithMinion.java
+++
b/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeQuickStartWithMinion.java
@@ -18,122 +18,29 @@
*/
package org.apache.pinot.tools;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import java.io.File;
-import java.net.URL;
-import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Properties;
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.io.LineIterator;
-import org.apache.pinot.common.utils.ZkStarter;
-import org.apache.pinot.spi.stream.StreamDataProducer;
-import org.apache.pinot.spi.stream.StreamDataProvider;
-import org.apache.pinot.spi.stream.StreamDataServerStartable;
import org.apache.pinot.tools.Quickstart.Color;
import org.apache.pinot.tools.admin.PinotAdministrator;
import org.apache.pinot.tools.admin.command.QuickstartRunner;
-import org.apache.pinot.tools.utils.KafkaStarterUtils;
-
-import static org.apache.pinot.tools.Quickstart.prettyPrintResponse;
/**
* This quickstart shows how RealtimeToOfflineSegmentsTask and MergeRollupTask
minion
* tasks continuously optimize segments as data gets ingested into Realtime
table.
*/
-public class RealtimeQuickStartWithMinion extends QuickStartBase {
+public class RealtimeQuickStartWithMinion extends RealtimeQuickStart {
@Override
public List<String> types() {
return Arrays.asList("REALTIME_MINION", "REALTIME-MINION");
}
- private StreamDataServerStartable _kafkaStarter;
-
- public static void main(String[] args)
- throws Exception {
- List<String> arguments = new ArrayList<>();
- arguments.addAll(Arrays.asList("QuickStart", "-type", "REALTIME-MINION"));
- arguments.addAll(Arrays.asList(args));
- PinotAdministrator.main(arguments.toArray(new String[arguments.size()]));
- }
-
- public Map<String, Object> getConfigOverrides() {
- Map<String, Object> properties = new HashMap<>(super.getConfigOverrides());
- properties.putIfAbsent("controller.task.scheduler.enabled", true);
- return properties;
- }
-
- public void execute()
+ @Override
+ public void runSampleQueries(QuickstartRunner runner)
throws Exception {
- File quickstartTmpDir = new File(_dataDir,
String.valueOf(System.currentTimeMillis()));
- File baseDir = new File(quickstartTmpDir, "githubEvents");
- File dataDir = new File(baseDir, "rawdata");
- Preconditions.checkState(dataDir.mkdirs());
-
- File schemaFile = new File(baseDir, "githubEvents_schema.json");
- File realtimeTableConfigFile = new File(baseDir,
"githubEvents_realtime_table_config.json");
- File offlineTableConfigFile = new File(baseDir,
"githubEvents_offline_table_config.json");
- File inputDataFile = new File(baseDir, "2021-07-21-few-hours.json");
-
- ClassLoader classLoader = Quickstart.class.getClassLoader();
- URL resource =
classLoader.getResource("examples/minions/stream/githubEvents/githubEvents_schema.json");
- Preconditions.checkNotNull(resource);
- FileUtils.copyURLToFile(resource, schemaFile);
- resource =
classLoader.getResource("examples/minions/stream/githubEvents/githubEvents_realtime_table_config.json");
- Preconditions.checkNotNull(resource);
- FileUtils.copyURLToFile(resource, realtimeTableConfigFile);
- resource =
classLoader.getResource("examples/minions/stream/githubEvents/githubEvents_offline_table_config.json");
- Preconditions.checkNotNull(resource);
- FileUtils.copyURLToFile(resource, offlineTableConfigFile);
- resource = Quickstart.class.getClassLoader()
-
.getResource("examples/minions/stream/githubEvents/rawdata/2021-07-21-few-hours.json");
- Preconditions.checkNotNull(resource);
- FileUtils.copyURLToFile(resource, inputDataFile);
-
- QuickstartTableRequest request = new
QuickstartTableRequest(baseDir.getAbsolutePath());
- QuickstartRunner runner =
- new QuickstartRunner(Lists.newArrayList(request), 1, 1, 1, 1,
- dataDir, true, null, getConfigOverrides(), null, true);
-
- printStatus(Color.CYAN, "***** Starting Kafka *****");
- final ZkStarter.ZookeeperInstance zookeeperInstance =
ZkStarter.startLocalZkServer();
- try {
- _kafkaStarter =
StreamDataProvider.getServerDataStartable(KafkaStarterUtils.KAFKA_SERVER_STARTABLE_CLASS_NAME,
- KafkaStarterUtils.getDefaultKafkaConfiguration(zookeeperInstance));
- } catch (Exception e) {
- throw new RuntimeException("Failed to start " +
KafkaStarterUtils.KAFKA_SERVER_STARTABLE_CLASS_NAME, e);
- }
- _kafkaStarter.start();
-
- printStatus(Color.CYAN, "***** Starting Zookeeper, controller, server and
broker *****");
- runner.startAll();
- Runtime.getRuntime().addShutdownHook(new Thread(() -> {
- try {
- printStatus(Color.GREEN, "***** Shutting down realtime-minion quick
start *****");
- runner.stop();
- _kafkaStarter.stop();
- ZkStarter.stopLocalZkServer(zookeeperInstance);
- FileUtils.deleteDirectory(quickstartTmpDir);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }));
-
- printStatus(Color.CYAN, "***** Sending events to Kafka *****");
- _kafkaStarter.createTopic("githubEvents",
KafkaStarterUtils.getTopicCreationProps(2));
- publishGithubEventsToKafka("githubEvents", inputDataFile);
-
- printStatus(Color.CYAN, "***** Bootstrap githubEvents tables *****");
- runner.bootstrapTable();
- printStatus(Color.CYAN, "***** Waiting for 5 seconds for a few events to
get populated *****");
- Thread.sleep(5000);
-
printStatus(Color.YELLOW, "***** Realtime-minion quickstart setup complete
*****");
String q1 = "select count(*) from githubEvents limit 1";
@@ -142,26 +49,21 @@ public class RealtimeQuickStartWithMinion extends
QuickStartBase {
printStatus(Color.YELLOW, prettyPrintResponse(runner.runQuery(q1)));
printStatus(Color.GREEN,
"***************************************************");
- printStatus(Color.GREEN, "You can always go to http://localhost:9000 to
play around in the query console");
printStatus(Color.GREEN, "In particular, you will find that OFFLINE table
gets segments from REALTIME table;");
printStatus(Color.GREEN, "and segments in OFFLINE table get merged into
larger ones within a few minutes.");
}
- private static void publishGithubEventsToKafka(String topicName, File
dataFile)
+ public static void main(String[] args)
throws Exception {
- Properties properties = new Properties();
- properties.put("metadata.broker.list",
KafkaStarterUtils.DEFAULT_KAFKA_BROKER);
- properties.put("serializer.class", "kafka.serializer.DefaultEncoder");
- properties.put("request.required.acks", "1");
- StreamDataProducer producer =
-
StreamDataProvider.getStreamDataProducer(KafkaStarterUtils.KAFKA_PRODUCER_CLASS_NAME,
properties);
- try {
- LineIterator dataStream = FileUtils.lineIterator(dataFile);
- while (dataStream.hasNext()) {
- producer.produce(topicName,
dataStream.nextLine().getBytes(StandardCharsets.UTF_8));
- }
- } finally {
- producer.close();
- }
+ List<String> arguments = new ArrayList<>();
+ arguments.addAll(Arrays.asList("QuickStart", "-type", "REALTIME-MINION"));
+ arguments.addAll(Arrays.asList(args));
+ PinotAdministrator.main(arguments.toArray(new String[arguments.size()]));
+ }
+
+ public Map<String, Object> getConfigOverrides() {
+ Map<String, Object> properties = new HashMap<>(super.getConfigOverrides());
+ properties.putIfAbsent("controller.task.scheduler.enabled", true);
+ return properties;
}
}
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/UpsertJsonQuickStart.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/UpsertJsonQuickStart.java
index 383d2593ec..22af89ff27 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/UpsertJsonQuickStart.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/UpsertJsonQuickStart.java
@@ -26,9 +26,6 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.commons.io.FileUtils;
-import org.apache.pinot.common.utils.ZkStarter;
-import org.apache.pinot.spi.stream.StreamDataProvider;
-import org.apache.pinot.spi.stream.StreamDataServerStartable;
import org.apache.pinot.tools.Quickstart.Color;
import org.apache.pinot.tools.admin.PinotAdministrator;
import org.apache.pinot.tools.admin.command.QuickstartRunner;
@@ -36,8 +33,6 @@ import org.apache.pinot.tools.streams.MeetupRsvpStream;
import org.apache.pinot.tools.streams.RsvpSourceGenerator;
import org.apache.pinot.tools.utils.KafkaStarterUtils;
-import static org.apache.pinot.tools.Quickstart.prettyPrintResponse;
-
public class UpsertJsonQuickStart extends QuickStartBase {
@Override
@@ -45,8 +40,6 @@ public class UpsertJsonQuickStart extends QuickStartBase {
return Arrays.asList("UPSERT_JSON_INDEX", "UPSERT-JSON-INDEX");
}
- private StreamDataServerStartable _kafkaStarter;
-
public static void main(String[] args)
throws Exception {
List<String> arguments = new ArrayList<>();
@@ -77,15 +70,7 @@ public class UpsertJsonQuickStart extends QuickStartBase {
QuickstartRunner runner =
new QuickstartRunner(Lists.newArrayList(request), 1, 1, 1, 1, dataDir,
getConfigOverrides());
- printStatus(Color.CYAN, "***** Starting Kafka *****");
- ZkStarter.ZookeeperInstance zookeeperInstance =
ZkStarter.startLocalZkServer();
- try {
- _kafkaStarter =
StreamDataProvider.getServerDataStartable(KafkaStarterUtils.KAFKA_SERVER_STARTABLE_CLASS_NAME,
- KafkaStarterUtils.getDefaultKafkaConfiguration(zookeeperInstance));
- } catch (Exception e) {
- throw new RuntimeException("Failed to start " +
KafkaStarterUtils.KAFKA_SERVER_STARTABLE_CLASS_NAME, e);
- }
- _kafkaStarter.start();
+ startKafka();
_kafkaStarter.createTopic("meetupRSVPEvents",
KafkaStarterUtils.getTopicCreationProps(2));
printStatus(Color.CYAN, "***** Starting meetup data stream and publishing
to Kafka *****");
MeetupRsvpStream meetupRSVPProvider = new
MeetupRsvpStream(RsvpSourceGenerator.KeyColumn.RSVP_ID);
@@ -97,8 +82,6 @@ public class UpsertJsonQuickStart extends QuickStartBase {
printStatus(Color.GREEN, "***** Shutting down realtime quick start
*****");
runner.stop();
meetupRSVPProvider.stopPublishing();
- _kafkaStarter.stop();
- ZkStarter.stopLocalZkServer(zookeeperInstance);
FileUtils.deleteDirectory(quickstartTmpDir);
} catch (Exception e) {
e.printStackTrace();
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/UpsertQuickStart.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/UpsertQuickStart.java
index ccf219325d..d67bbcdaf6 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/UpsertQuickStart.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/UpsertQuickStart.java
@@ -27,17 +27,12 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.commons.io.FileUtils;
-import org.apache.pinot.common.utils.ZkStarter;
-import org.apache.pinot.spi.stream.StreamDataProvider;
-import org.apache.pinot.spi.stream.StreamDataServerStartable;
import org.apache.pinot.tools.Quickstart.Color;
import org.apache.pinot.tools.admin.PinotAdministrator;
import org.apache.pinot.tools.admin.command.QuickstartRunner;
import org.apache.pinot.tools.streams.MeetupRsvpStream;
import org.apache.pinot.tools.utils.KafkaStarterUtils;
-import static org.apache.pinot.tools.Quickstart.prettyPrintResponse;
-
public class UpsertQuickStart extends QuickStartBase {
@Override
@@ -45,8 +40,6 @@ public class UpsertQuickStart extends QuickStartBase {
return Collections.singletonList("UPSERT");
}
- private StreamDataServerStartable _kafkaStarter;
-
public static void main(String[] args)
throws Exception {
List<String> arguments = new ArrayList<>();
@@ -77,15 +70,7 @@ public class UpsertQuickStart extends QuickStartBase {
final QuickstartRunner runner
= new QuickstartRunner(Lists.newArrayList(request), 1, 1, 1, 1,
dataDir, getConfigOverrides());
- printStatus(Color.CYAN, "***** Starting Kafka *****");
- final ZkStarter.ZookeeperInstance zookeeperInstance =
ZkStarter.startLocalZkServer();
- try {
- _kafkaStarter =
StreamDataProvider.getServerDataStartable(KafkaStarterUtils.KAFKA_SERVER_STARTABLE_CLASS_NAME,
- KafkaStarterUtils.getDefaultKafkaConfiguration(zookeeperInstance));
- } catch (Exception e) {
- throw new RuntimeException("Failed to start " +
KafkaStarterUtils.KAFKA_SERVER_STARTABLE_CLASS_NAME, e);
- }
- _kafkaStarter.start();
+ startKafka();
_kafkaStarter.createTopic("meetupRSVPEvents",
KafkaStarterUtils.getTopicCreationProps(2));
printStatus(Color.CYAN, "***** Starting meetup data stream and publishing
to Kafka *****");
MeetupRsvpStream meetupRSVPProvider = new MeetupRsvpStream(true);
@@ -97,8 +82,6 @@ public class UpsertQuickStart extends QuickStartBase {
printStatus(Color.GREEN, "***** Shutting down realtime quick start
*****");
runner.stop();
meetupRSVPProvider.stopPublishing();
- _kafkaStarter.stop();
- ZkStarter.stopLocalZkServer(zookeeperInstance);
FileUtils.deleteDirectory(quickstartTmpDir);
} catch (Exception e) {
e.printStackTrace();
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/AirlineDataStream.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/AirlineDataStream.java
index 2dee145ac0..9b4aea8060 100644
---
a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/AirlineDataStream.java
+++
b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/AirlineDataStream.java
@@ -25,6 +25,7 @@ import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.stream.StreamDataProducer;
import org.apache.pinot.spi.stream.StreamDataProvider;
+import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.tools.QuickStartBase;
import org.apache.pinot.tools.Quickstart;
import org.apache.pinot.tools.utils.KafkaStarterUtils;
@@ -42,11 +43,6 @@ public class AirlineDataStream {
private StreamDataProducer _producer;
private PinotRealtimeSource _pinotStream;
- public AirlineDataStream(Schema pinotSchema, TableConfig tableConfig, File
avroFile)
- throws Exception {
- this(pinotSchema, tableConfig, avroFile, getDefaultKafkaProducer());
- }
-
public AirlineDataStream(Schema pinotSchema, TableConfig tableConfig, File
avroFile, StreamDataProducer producer)
throws IOException {
_pinotSchema = pinotSchema;
@@ -63,6 +59,13 @@ public class AirlineDataStream {
+ "every 60 events (which is approximately 60 seconds) *****");
}
+ public AirlineDataStream(File baseDir)
+ throws Exception {
+ this(Schema.fromFile(new File(baseDir, "airlineStats_schema.json")),
+ JsonUtils.fileToObject(new File(baseDir,
"airlineStats_realtime_table_config.json"), TableConfig.class),
+ new File(baseDir, "rawdata/airlineStats_data.avro"),
getDefaultKafkaProducer());
+ }
+
public static StreamDataProducer getDefaultKafkaProducer()
throws Exception {
Properties properties = new Properties();
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpStream.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpStream.java
index f42ffd7fe9..527ca283fb 100644
---
a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpStream.java
+++
b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpStream.java
@@ -30,23 +30,35 @@ import org.slf4j.LoggerFactory;
public class MeetupRsvpStream {
protected static final Logger LOGGER =
LoggerFactory.getLogger(MeetupRsvpStream.class);
private static final String DEFAULT_TOPIC_NAME = "meetupRSVPEvents";
- protected String _topicName = DEFAULT_TOPIC_NAME;
+ protected String _topicName;
protected PinotRealtimeSource _pinotRealtimeSource;
public MeetupRsvpStream()
throws Exception {
- this(false);
+ this(DEFAULT_TOPIC_NAME, RsvpSourceGenerator.KeyColumn.NONE);
}
public MeetupRsvpStream(boolean partitionByKey)
throws Exception {
// calling this constructor means that we wish to use EVENT_ID as key.
RsvpId is used by MeetupRsvpJsonStream
- this(partitionByKey ? RsvpSourceGenerator.KeyColumn.EVENT_ID :
RsvpSourceGenerator.KeyColumn.NONE);
+ this(DEFAULT_TOPIC_NAME,
+ partitionByKey ? RsvpSourceGenerator.KeyColumn.EVENT_ID :
RsvpSourceGenerator.KeyColumn.NONE);
+ }
+
+ public MeetupRsvpStream(String topicName)
+ throws Exception {
+ this(topicName, RsvpSourceGenerator.KeyColumn.NONE);
}
public MeetupRsvpStream(RsvpSourceGenerator.KeyColumn keyColumn)
throws Exception {
+ this(DEFAULT_TOPIC_NAME, keyColumn);
+ }
+
+ public MeetupRsvpStream(String topicName, RsvpSourceGenerator.KeyColumn
keyColumn)
+ throws Exception {
+ _topicName = topicName;
Properties properties = new Properties();
properties.put("metadata.broker.list",
KafkaStarterUtils.DEFAULT_KAFKA_BROKER);
properties.put("serializer.class", "kafka.serializer.DefaultEncoder");
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/RsvpSourceGenerator.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/RsvpSourceGenerator.java
index d192d29d1b..cf04a9acbe 100644
---
a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/RsvpSourceGenerator.java
+++
b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/RsvpSourceGenerator.java
@@ -19,6 +19,7 @@
package org.apache.pinot.tools.streams;
+import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.ImmutableList;
import java.time.LocalDateTime;
@@ -50,17 +51,62 @@ public class RsvpSourceGenerator implements
PinotSourceDataGenerator {
public RSVP createMessage() {
String eventId = Math.abs(ThreadLocalRandom.current().nextLong()) + "";
ObjectNode json = JsonUtils.newObjectNode();
- json.put("venue_name", "venue_name" +
ThreadLocalRandom.current().nextInt());
- json.put("event_name", "event_name" +
ThreadLocalRandom.current().nextInt());
+ ObjectNode eventJson = JsonUtils.newObjectNode();
+ json.set("event", eventJson);
+ ObjectNode groupJson = JsonUtils.newObjectNode();
+ json.set("group", groupJson);
+ ObjectNode venueJson = JsonUtils.newObjectNode();
+ json.set("venue", venueJson);
+
+ String venueName = "venue_name" + ThreadLocalRandom.current().nextInt();
+ venueJson.put("venue_name", venueName);
+ json.put("venue_name", venueName);
+
+ String eventName = "event_name" + ThreadLocalRandom.current().nextInt();
+ eventJson.put("event_name", eventName);
+ json.put("event_name", eventName);
+
json.put("event_id", eventId);
json.put("event_time",
DATE_TIME_FORMATTER.format(LocalDateTime.now().plusDays(10)));
- json.put("group_city", "group_city" +
ThreadLocalRandom.current().nextInt(1000));
- json.put("group_country", "group_country" +
ThreadLocalRandom.current().nextInt(100));
- json.put("group_id", Math.abs(ThreadLocalRandom.current().nextLong()));
- json.put("group_name", "group_name" +
ThreadLocalRandom.current().nextInt());
- json.put("group_lat", ThreadLocalRandom.current().nextDouble(-90.0, 90.0));
- json.put("group_lon", ThreadLocalRandom.current().nextDouble(180.0));
+
+ ArrayNode groupTopicsJson = JsonUtils.newArrayNode();
+ groupJson.set("group_topics", groupTopicsJson);
+ for (int i = 0; i < ThreadLocalRandom.current().nextInt(5) + 1; i++) {
+ ObjectNode groupTopicJson = JsonUtils.newObjectNode();
+ groupTopicJson.put("topic_name", "topic_name" +
ThreadLocalRandom.current().nextInt(10));
+ groupTopicJson.put("urlkey", "http://group-url-" +
ThreadLocalRandom.current().nextInt(1000));
+ groupTopicsJson.add(groupTopicJson);
+ }
+
+ String groupCity = "group_city" +
ThreadLocalRandom.current().nextInt(1000);
+ groupJson.put("group_city", groupCity);
+ json.put("group_city", groupCity);
+
+ String groupCountry = "group_country" +
ThreadLocalRandom.current().nextInt(100);
+ groupJson.put("group_country", groupCountry);
+ json.put("group_country", groupCountry);
+
+ long groupId = Math.abs(ThreadLocalRandom.current().nextLong());
+ groupJson.put("group_id", groupId);
+ json.put("group_id", groupId);
+
+ String groupName = "group_name" + ThreadLocalRandom.current().nextInt();
+ groupJson.put("group_name", groupName);
+ json.put("group_name", groupName);
+
+ double groupLat = ThreadLocalRandom.current().nextDouble(-90.0, 90.0);
+ groupJson.put("group_lat", groupLat);
+ json.put("group_lat", groupLat);
+
+ double groupLon = ThreadLocalRandom.current().nextDouble(-90.0, 90.0);
+ groupJson.put("group_lon", groupLon);
+ json.put("group_lon", groupLon);
+
json.put("mtime", DateTime.now().getMillis());
+
+ json.put("rsvp_id", ThreadLocalRandom.current().nextLong());
+ json.put("guests", ThreadLocalRandom.current().nextInt(100));
+
json.put("rsvp_count", 1);
return new RSVP(eventId, eventId, json);
}
@@ -91,6 +137,7 @@ public class RsvpSourceGenerator implements
PinotSourceDataGenerator {
public void close()
throws Exception {
}
+
public enum KeyColumn {
NONE,
EVENT_ID,
diff --git
a/pinot-tools/src/main/resources/examples/stream/airlineStats/airlineStats_realtime_table_config.json
b/pinot-tools/src/main/resources/examples/stream/airlineStats/airlineStats_realtime_table_config.json
index cb5d690628..94da8849f2 100644
---
a/pinot-tools/src/main/resources/examples/stream/airlineStats/airlineStats_realtime_table_config.json
+++
b/pinot-tools/src/main/resources/examples/stream/airlineStats/airlineStats_realtime_table_config.json
@@ -29,7 +29,33 @@
"stream.kafka.consumer.prop.auto.offset.reset": "smallest"
}
},
+ "fieldConfigList": [
+ {
+ "name": "ts",
+ "encodingType": "DICTIONARY",
+ "indexTypes": ["TIMESTAMP"],
+ "timestampConfig": {
+ "granularities": [
+ "DAY",
+ "WEEK",
+ "MONTH"
+ ]
+ }
+ }
+ ],
"metadata": {
"customConfigs": {}
+ },
+ "ingestionConfig": {
+ "transformConfigs": [
+ {
+ "columnName": "ts",
+ "transformFunction": "fromEpochDays(DaysSinceEpoch)"
+ },
+ {
+ "columnName": "tsRaw",
+ "transformFunction": "fromEpochDays(DaysSinceEpoch)"
+ }
+ ]
}
}
diff --git
a/pinot-tools/src/main/resources/examples/stream/airlineStats/airlineStats_schema.json
b/pinot-tools/src/main/resources/examples/stream/airlineStats/airlineStats_schema.json
index 97757af519..563e5caa19 100644
---
a/pinot-tools/src/main/resources/examples/stream/airlineStats/airlineStats_schema.json
+++
b/pinot-tools/src/main/resources/examples/stream/airlineStats/airlineStats_schema.json
@@ -330,6 +330,18 @@
"dataType": "INT",
"format": "1:DAYS:EPOCH",
"granularity": "1:DAYS"
+ },
+ {
+ "name": "ts",
+ "dataType": "TIMESTAMP",
+ "format": "1:MILLISECONDS:TIMESTAMP",
+ "granularity": "1:SECONDS"
+ },
+ {
+ "name": "tsRaw",
+ "dataType": "TIMESTAMP",
+ "format": "1:MILLISECONDS:TIMESTAMP",
+ "granularity": "1:SECONDS"
}
],
"schemaName": "airlineStats"
diff --git
a/pinot-tools/src/main/resources/examples/stream/airlineStats/sample_data/airlineStats_data.avro
b/pinot-tools/src/main/resources/examples/stream/airlineStats/rawdata/airlineStats_data.avro
similarity index 100%
rename from
pinot-tools/src/main/resources/examples/stream/airlineStats/sample_data/airlineStats_data.avro
rename to
pinot-tools/src/main/resources/examples/stream/airlineStats/rawdata/airlineStats_data.avro
diff --git
a/pinot-tools/src/main/resources/examples/stream/airlineStats/sample_data/airlineStats_data.json
b/pinot-tools/src/main/resources/examples/stream/airlineStats/rawdata/airlineStats_data.json
similarity index 100%
rename from
pinot-tools/src/main/resources/examples/stream/airlineStats/sample_data/airlineStats_data.json
rename to
pinot-tools/src/main/resources/examples/stream/airlineStats/rawdata/airlineStats_data.json
diff --git
a/pinot-tools/src/main/resources/examples/stream/airlineStats/sample_data/airlineStats_data.orc
b/pinot-tools/src/main/resources/examples/stream/airlineStats/rawdata/airlineStats_data.orc
similarity index 100%
rename from
pinot-tools/src/main/resources/examples/stream/airlineStats/sample_data/airlineStats_data.orc
rename to
pinot-tools/src/main/resources/examples/stream/airlineStats/rawdata/airlineStats_data.orc
diff --git
a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/kafka_0.9/meetupRsvp_realtime_table_config.json
b/pinot-tools/src/main/resources/examples/stream/meetupRsvp/kafka_0.9/meetupRsvp_realtime_table_config.json
index 6742b2177b..e2420e12ab 100644
---
a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/kafka_0.9/meetupRsvp_realtime_table_config.json
+++
b/pinot-tools/src/main/resources/examples/stream/meetupRsvp/kafka_0.9/meetupRsvp_realtime_table_config.json
@@ -8,9 +8,25 @@
"segmentAssignmentStrategy": "BalanceNumSegmentAssignmentStrategy",
"schemaName": "meetupRsvp",
"replication": "1",
- "replicasPerPartition": "1"
+ "replicasPerPartition": "1",
+ "retentionTimeUnit": "DAYS",
+ "retentionTimeValue": "1"
},
"tenants": {},
+ "fieldConfigList": [
+ {
+ "name": "mtime",
+ "encodingType": "DICTIONARY",
+ "indexTypes": ["TIMESTAMP"],
+ "timestampConfig": {
+ "granularities": [
+ "DAY",
+ "WEEK",
+ "MONTH"
+ ]
+ }
+ }
+ ],
"tableIndexConfig": {
"loadMode": "MMAP",
"streamConfigs": {
diff --git
a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/kafka_2.0/meetupRsvp_realtime_table_config.json
b/pinot-tools/src/main/resources/examples/stream/meetupRsvp/kafka_2.0/meetupRsvp_realtime_table_config.json
index 1a144425a2..41379c2569 100644
---
a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/kafka_2.0/meetupRsvp_realtime_table_config.json
+++
b/pinot-tools/src/main/resources/examples/stream/meetupRsvp/kafka_2.0/meetupRsvp_realtime_table_config.json
@@ -8,9 +8,25 @@
"segmentAssignmentStrategy": "BalanceNumSegmentAssignmentStrategy",
"schemaName": "meetupRsvp",
"replication": "1",
- "replicasPerPartition": "1"
+ "replicasPerPartition": "1",
+ "retentionTimeUnit": "DAYS",
+ "retentionTimeValue": "1"
},
"tenants": {},
+ "fieldConfigList": [
+ {
+ "name": "mtime",
+ "encodingType": "DICTIONARY",
+ "indexTypes": ["TIMESTAMP"],
+ "timestampConfig": {
+ "granularities": [
+ "DAY",
+ "WEEK",
+ "MONTH"
+ ]
+ }
+ }
+ ],
"tableIndexConfig": {
"loadMode": "MMAP",
"streamConfigs": {
diff --git
a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/json_meetupRsvp_realtime_table_config.json
b/pinot-tools/src/main/resources/examples/stream/meetupRsvpComplexType/meetupRsvpComplexType_realtime_table_config.json
similarity index 57%
copy from
pinot-tools/src/main/resources/examples/stream/meetupRsvp/json_meetupRsvp_realtime_table_config.json
copy to
pinot-tools/src/main/resources/examples/stream/meetupRsvpComplexType/meetupRsvpComplexType_realtime_table_config.json
index 7b20d696e1..43662de8fd 100644
---
a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/json_meetupRsvp_realtime_table_config.json
+++
b/pinot-tools/src/main/resources/examples/stream/meetupRsvpComplexType/meetupRsvpComplexType_realtime_table_config.json
@@ -1,5 +1,5 @@
{
- "tableName": "meetupRsvp",
+ "tableName": "meetupRsvpComplexType",
"tableType": "REALTIME",
"tenants": {},
"segmentsConfig": {
@@ -16,7 +16,7 @@
{
"streamType": "kafka",
"stream.kafka.consumer.type": "lowLevel",
- "stream.kafka.topic.name": "meetupRSVPEvents",
+ "stream.kafka.topic.name": "meetupRSVPComplexTypeEvents",
"stream.kafka.decoder.class.name":
"org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
"stream.kafka.hlc.zk.connect.string": "localhost:2191/kafka",
"stream.kafka.consumer.factory.class.name":
"org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
@@ -26,38 +26,15 @@
]
},
"transformConfigs": [
- {
- "columnName": "event_json",
- "transformFunction": "jsonFormat(event)"
- },
- {
- "columnName": "group_json",
- "transformFunction": "jsonFormat(\"group\")"
- },
- {
- "columnName": "member_json",
- "transformFunction": "jsonFormat(member)"
- },
- {
- "columnName": "venue_json",
- "transformFunction": "jsonFormat(venue)"
- }
- ]
+ ],
+ "complexTypeConfig": {
+ "fieldsToUnnest": [
+ "group.group_topics"
+ ]
+ }
},
"tableIndexConfig": {
- "loadMode": "MMAP",
- "noDictionaryColumns": [
- "event_json",
- "group_json",
- "member_json",
- "venue_json"
- ],
- "jsonIndexColumns": [
- "event_json",
- "group_json",
- "member_json",
- "venue_json"
- ]
+ "loadMode": "MMAP"
},
"metadata": {
"customConfigs": {}
diff --git
a/pinot-tools/src/main/resources/examples/stream/meetupRsvpComplexType/meetupRsvpComplexType_schema.json
b/pinot-tools/src/main/resources/examples/stream/meetupRsvpComplexType/meetupRsvpComplexType_schema.json
new file mode 100644
index 0000000000..0463c01cc1
--- /dev/null
+++
b/pinot-tools/src/main/resources/examples/stream/meetupRsvpComplexType/meetupRsvpComplexType_schema.json
@@ -0,0 +1,63 @@
+{
+ "schemaName": "meetupRsvpComplexType",
+ "dimensionFieldSpecs": [
+ {
+ "name": "group.group_topics.urlkey",
+ "dataType": "STRING"
+ },
+ {
+ "name": "group.group_topics.topic_name",
+ "dataType": "STRING"
+ },
+ {
+ "name": "group.group_city",
+ "dataType": "STRING"
+ },
+ {
+ "name": "group.group_country",
+ "dataType": "STRING"
+ },
+ {
+ "name": "group.group_id",
+ "dataType": "INT"
+ },
+ {
+ "name": "group.group_name",
+ "dataType": "STRING"
+ },
+ {
+ "name": "group.group_lon",
+ "dataType": "DOUBLE"
+ },
+ {
+ "name": "group.group_urlname",
+ "dataType": "STRING"
+ },
+ {
+ "name": "group.group_state",
+ "dataType": "STRING"
+ },
+ {
+ "name": "group.group_lat",
+ "dataType": "DOUBLE"
+ },
+ {
+ "name": "rsvp_id",
+ "dataType": "LONG"
+ }
+ ],
+ "metricFieldSpecs": [
+ {
+ "name": "guests",
+ "dataType": "INT"
+ }
+ ],
+ "dateTimeFieldSpecs": [
+ {
+ "name": "mtime",
+ "dataType": "LONG",
+ "format": "1:MILLISECONDS:EPOCH",
+ "granularity": "1:MILLISECONDS"
+ }
+ ]
+}
diff --git
a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/json_meetupRsvp_realtime_table_config.json
b/pinot-tools/src/main/resources/examples/stream/meetupRsvpJson/meetupRsvpJson_realtime_table_config.json
similarity index 94%
rename from
pinot-tools/src/main/resources/examples/stream/meetupRsvp/json_meetupRsvp_realtime_table_config.json
rename to
pinot-tools/src/main/resources/examples/stream/meetupRsvpJson/meetupRsvpJson_realtime_table_config.json
index 7b20d696e1..6ab8dd4f36 100644
---
a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/json_meetupRsvp_realtime_table_config.json
+++
b/pinot-tools/src/main/resources/examples/stream/meetupRsvpJson/meetupRsvpJson_realtime_table_config.json
@@ -1,5 +1,5 @@
{
- "tableName": "meetupRsvp",
+ "tableName": "meetupRsvpJson",
"tableType": "REALTIME",
"tenants": {},
"segmentsConfig": {
@@ -16,7 +16,7 @@
{
"streamType": "kafka",
"stream.kafka.consumer.type": "lowLevel",
- "stream.kafka.topic.name": "meetupRSVPEvents",
+ "stream.kafka.topic.name": "meetupRSVPJsonEvents",
"stream.kafka.decoder.class.name":
"org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
"stream.kafka.hlc.zk.connect.string": "localhost:2191/kafka",
"stream.kafka.consumer.factory.class.name":
"org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
diff --git
a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/json_meetupRsvp_schema.json
b/pinot-tools/src/main/resources/examples/stream/meetupRsvpJson/meetupRsvpJson_schema.json
similarity index 95%
rename from
pinot-tools/src/main/resources/examples/stream/meetupRsvp/json_meetupRsvp_schema.json
rename to
pinot-tools/src/main/resources/examples/stream/meetupRsvpJson/meetupRsvpJson_schema.json
index afeee95f6b..23753ffb02 100644
---
a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/json_meetupRsvp_schema.json
+++
b/pinot-tools/src/main/resources/examples/stream/meetupRsvpJson/meetupRsvpJson_schema.json
@@ -1,5 +1,5 @@
{
- "schemaName": "meetupRsvp",
+ "schemaName": "meetupRsvpJson",
"dimensionFieldSpecs": [
{
"name": "event_json",
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]