This is an automated email from the ASF dual-hosted git repository.

kishoreg pushed a commit to branch stream-api-to-spi
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 57c9ad46a67321653053d99ba5bd41028f246386
Author: kishoreg <[email protected]>
AuthorDate: Tue Dec 3 22:40:57 2019 -0800

    Moving stream related apis to pinot-spi
---
 .../broker/routing/RoutingTableBuilderFactory.java |  2 +-
 .../apache/pinot/common/config/QuotaConfig.java    |  2 +-
 .../SegmentsValidationAndRetentionConfig.java      |  2 +-
 .../apache/pinot/common/utils/DataSizeTest.java    |  1 +
 .../org/apache/pinot/common/utils/UtilsTest.java   |  2 +-
 .../impl/kafka/KafkaConnectionHandler.java         |  6 +++---
 .../realtime/impl/kafka/KafkaConsumerFactory.java  |  8 ++++----
 .../impl/kafka/KafkaHighLevelStreamConfig.java     |  4 ++--
 .../impl/kafka/KafkaLowLevelStreamConfig.java      |  2 +-
 .../impl/kafka/KafkaPartitionLevelConsumer.java    |  6 +++---
 .../impl/kafka/KafkaStreamLevelConsumer.java       |  8 ++++----
 .../impl/kafka/KafkaStreamMetadataProvider.java    |  6 +++---
 .../impl/kafka/SimpleConsumerMessageBatch.java     |  2 +-
 .../impl/kafka/server/KafkaDataProducer.java       |  2 +-
 .../kafka/server/KafkaDataServerStartable.java     |  2 +-
 ....apache.pinot.spi.stream.StreamConsumerFactory} |  0
 .../impl/kafka/KafkaLowLevelStreamConfigTest.java  |  4 ++--
 .../kafka/KafkaPartitionLevelConsumerTest.java     |  8 ++------
 .../realtime/impl/kafka2/KafkaConsumerFactory.java |  8 ++++----
 .../realtime/impl/kafka2/KafkaMessageBatch.java    |  2 +-
 .../KafkaPartitionLevelConnectionHandler.java      |  2 +-
 .../impl/kafka2/KafkaPartitionLevelConsumer.java   |  6 +++---
 .../kafka2/KafkaPartitionLevelStreamConfig.java    |  2 +-
 .../impl/kafka2/KafkaStreamLevelConsumer.java      |  8 ++++----
 .../impl/kafka2/KafkaStreamLevelStreamConfig.java  |  4 ++--
 .../impl/kafka2/KafkaStreamMetadataProvider.java   |  6 +++---
 .../impl/kafka2/server/KafkaDataProducer.java      |  2 +-
 .../kafka2/server/KafkaDataServerStartable.java    |  2 +-
 ....apache.pinot.spi.stream.StreamConsumerFactory} |  0
 .../kafka2/KafkaPartitionLevelConsumerTest.java    | 12 ++++++------
 .../KafkaPartitionLevelStreamConfigTest.java       |  4 ++--
 .../impl/kafka/KafkaAvroMessageDecoder.java        |  2 +-
 .../impl/kafka/KafkaJSONMessageDecoder.java        |  2 +-
 .../realtime/impl/kafka/KafkaStarterUtils.java     |  6 +++---
 .../impl/kafka/KafkaStreamConfigProperties.java    |  2 +-
 .../controller/api/upload/SegmentValidator.java    |  2 +-
 .../helix/core/PinotHelixResourceManager.java      |  5 +++--
 .../helix/core/PinotTableIdealStateBuilder.java    |  4 ++--
 .../realtime/PinotLLCRealtimeSegmentManager.java   | 20 +++++++++++---------
 .../core/realtime/PinotRealtimeSegmentManager.java |  2 +-
 .../segment/DefaultFlushThresholdUpdater.java      |  2 +-
 .../segment/FlushThresholdUpdateManager.java       |  2 +-
 .../realtime/segment/FlushThresholdUpdater.java    |  2 +-
 .../SegmentSizeBasedFlushThresholdUpdater.java     |  4 ++--
 .../helix/core/rebalance/TableRebalancer.java      |  2 +-
 .../core/relocation/RealtimeSegmentRelocator.java  |  4 ++--
 .../retention/strategy/TimeRetentionStrategy.java  |  2 +-
 .../controller/util/TableRetentionValidator.java   |  2 +-
 .../validation/OfflineSegmentIntervalChecker.java  |  2 +-
 .../RealtimeSegmentValidationManager.java          |  8 ++++----
 .../controller/validation/StorageQuotaChecker.java |  2 +-
 .../api/PinotTableRestletResourceTest.java         |  2 +-
 .../pinot/controller/api/TableViewsTest.java       |  2 +-
 .../PinotLLCRealtimeSegmentManagerTest.java        |  9 +++++----
 .../segment/FlushThresholdUpdaterTest.java         |  4 ++--
 .../realtime/HLRealtimeSegmentDataManager.java     |  8 ++++----
 .../realtime/LLRealtimeSegmentDataManager.java     | 22 +++++++++++-----------
 .../data/recordtransformer/TimeTransformer.java    |  2 +-
 .../core/indexsegment/mutable/MutableSegment.java  |  2 +-
 .../indexsegment/mutable/MutableSegmentImpl.java   |  2 +-
 .../realtime/stream/SimpleAvroMessageDecoder.java  |  1 +
 .../creator/impl/SegmentColumnarIndexCreator.java  |  2 +-
 .../core/segment/index/SegmentMetadataImpl.java    |  2 +-
 .../apache/pinot/core/util/ReplicationUtils.java   |  3 +--
 .../realtime/LLRealtimeSegmentDataManagerTest.java |  4 ++--
 .../pinot/core/data/readers/PinotSegmentUtil.java  |  2 +-
 .../MutableSegmentImplAggregateMetricsTest.java    |  2 +-
 .../mutable/MutableSegmentImplTest.java            |  3 +--
 .../fakestream/FakePartitionLevelConsumer.java     |  6 +++---
 .../impl/fakestream/FakeStreamConfigUtils.java     |  4 ++--
 .../impl/fakestream/FakeStreamConsumerFactory.java | 22 ++++++++++------------
 .../impl/fakestream/FakeStreamLevelConsumer.java   |  2 +-
 .../impl/fakestream/FakeStreamMessageBatch.java    |  2 +-
 .../impl/fakestream/FakeStreamMessageDecoder.java  |  2 +-
 .../fakestream/FakeStreamMetadataProvider.java     |  8 +++-----
 .../stream/MetadataEqualsHashCodeTest.java         |  1 +
 .../core/realtime/stream/OffsetCriteriaTest.java   |  1 +
 .../core/realtime/stream/StreamConfigTest.java     |  8 ++++++--
 .../SegmentGenerationWithTimeColumnTest.java       |  2 +-
 .../apache/pinot/queries/TransformQueriesTest.java |  2 +-
 .../hadoop/job/mappers/SegmentCreationMapper.java  |  2 +-
 .../spark/jobs/SparkSegmentCreationFunction.java   |  2 +-
 .../tests/BaseClusterIntegrationTest.java          |  2 +-
 .../tests/ClusterIntegrationTestUtils.java         |  4 ++--
 .../pinot/integration/tests/ClusterTest.java       |  6 +++---
 ...lakyConsumerRealtimeClusterIntegrationTest.java | 12 +++++-------
 ...ridClusterIntegrationTestCommandLineRunner.java |  2 +-
 .../perf/BenchmarkRealtimeConsumptionSpeed.java    |  2 +-
 .../org/apache/pinot/perf/RealtimeStressTest.java  |  2 +-
 .../org/apache/pinot/spi}/stream/MessageBatch.java |  2 +-
 .../apache/pinot/spi}/stream/OffsetCriteria.java   |  4 ++--
 .../pinot/spi}/stream/PartitionCountFetcher.java   |  2 +-
 .../pinot/spi}/stream/PartitionLevelConsumer.java  |  2 +-
 .../spi}/stream/PartitionLevelStreamConfig.java    |  9 ++-------
 .../pinot/spi}/stream/PartitionOffsetFetcher.java  |  2 +-
 .../spi}/stream/PermanentConsumerException.java    |  2 +-
 .../org/apache/pinot/spi}/stream/RowMetadata.java  |  2 +-
 .../org/apache/pinot/spi}/stream/StreamConfig.java | 11 +++--------
 .../pinot/spi}/stream/StreamConfigProperties.java  |  2 +-
 .../pinot/spi}/stream/StreamConsumerFactory.java   |  4 +---
 .../spi}/stream/StreamConsumerFactoryProvider.java |  6 +++---
 .../pinot/spi}/stream/StreamDataProducer.java      |  2 +-
 .../pinot/spi}/stream/StreamDataProvider.java      |  2 +-
 .../spi}/stream/StreamDataServerStartable.java     |  2 +-
 .../pinot/spi}/stream/StreamDecoderProvider.java   |  6 +++---
 .../pinot/spi}/stream/StreamLevelConsumer.java     |  2 +-
 .../pinot/spi}/stream/StreamMessageDecoder.java    |  2 +-
 .../pinot/spi}/stream/StreamMessageMetadata.java   |  2 +-
 .../pinot/spi}/stream/StreamMetadataProvider.java  |  2 +-
 .../spi}/stream/TransientConsumerException.java    |  2 +-
 .../java/org/apache/pinot/spi}/utils/DataSize.java |  2 +-
 .../org/apache/pinot/spi/utils}/TimeUtils.java     |  2 +-
 .../org/apache/pinot/tools/HybridQuickstart.java   |  4 ++--
 .../org/apache/pinot/tools/RealtimeQuickStart.java |  4 ++--
 .../OfflineSegmentIntervalCheckerCommand.java      |  2 +-
 .../command/RealtimeProvisioningHelperCommand.java |  4 ++--
 .../tools/admin/command/StartKafkaCommand.java     |  4 ++--
 .../admin/command/StreamAvroIntoKafkaCommand.java  |  4 ++--
 .../realtime/provisioning/MemoryEstimator.java     |  2 +-
 .../pinot/tools/streams/AirlineDataStream.java     |  4 ++--
 .../pinot/tools/streams/MeetupRsvpStream.java      |  6 +++---
 121 files changed, 232 insertions(+), 244 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/RoutingTableBuilderFactory.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/RoutingTableBuilderFactory.java
index 1624597..4b7061b 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/RoutingTableBuilderFactory.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/RoutingTableBuilderFactory.java
@@ -34,7 +34,7 @@ import org.apache.pinot.common.config.TableConfig;
 import org.apache.pinot.common.metrics.BrokerMetrics;
 import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.common.utils.CommonConstants.Helix.TableType;
-import org.apache.pinot.core.realtime.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/config/QuotaConfig.java 
b/pinot-common/src/main/java/org/apache/pinot/common/config/QuotaConfig.java
index 20414a4..a7f81e4 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/config/QuotaConfig.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/config/QuotaConfig.java
@@ -24,7 +24,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonPropertyDescription;
 import javax.annotation.Nullable;
 import org.apache.commons.configuration.ConfigurationRuntimeException;
-import org.apache.pinot.common.utils.DataSize;
+import org.apache.pinot.spi.utils.DataSize;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/config/SegmentsValidationAndRetentionConfig.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/config/SegmentsValidationAndRetentionConfig.java
index 09a3828..8a23525 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/config/SegmentsValidationAndRetentionConfig.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/config/SegmentsValidationAndRetentionConfig.java
@@ -20,7 +20,7 @@ package org.apache.pinot.common.config;
 
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import java.util.concurrent.TimeUnit;
-import org.apache.pinot.common.utils.time.TimeUtils;
+import org.apache.pinot.spi.utils.TimeUtils;
 import org.apache.pinot.startree.hll.HllConfig;
 
 
diff --git 
a/pinot-common/src/test/java/org/apache/pinot/common/utils/DataSizeTest.java 
b/pinot-common/src/test/java/org/apache/pinot/common/utils/DataSizeTest.java
index 1b3f317..f0fbce1 100644
--- a/pinot-common/src/test/java/org/apache/pinot/common/utils/DataSizeTest.java
+++ b/pinot-common/src/test/java/org/apache/pinot/common/utils/DataSizeTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.common.utils;
 
+import org.apache.pinot.spi.utils.DataSize;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
diff --git 
a/pinot-common/src/test/java/org/apache/pinot/common/utils/UtilsTest.java 
b/pinot-common/src/test/java/org/apache/pinot/common/utils/UtilsTest.java
index a9a02a2..fb471a1 100644
--- a/pinot-common/src/test/java/org/apache/pinot/common/utils/UtilsTest.java
+++ b/pinot-common/src/test/java/org/apache/pinot/common/utils/UtilsTest.java
@@ -20,7 +20,7 @@ package org.apache.pinot.common.utils;
 
 import java.util.concurrent.TimeUnit;
 import org.apache.pinot.common.Utils;
-import org.apache.pinot.common.utils.time.TimeUtils;
+import org.apache.pinot.spi.utils.TimeUtils;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
diff --git 
a/pinot-connectors/pinot-connector-kafka-0.9/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaConnectionHandler.java
 
b/pinot-connectors/pinot-connector-kafka-0.9/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaConnectionHandler.java
index fe363f8..d225bb7 100644
--- 
a/pinot-connectors/pinot-connector-kafka-0.9/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaConnectionHandler.java
+++ 
b/pinot-connectors/pinot-connector-kafka-0.9/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaConnectionHandler.java
@@ -34,9 +34,9 @@ import kafka.javaapi.TopicMetadataResponse;
 import kafka.javaapi.consumer.SimpleConsumer;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.pinot.core.realtime.stream.PermanentConsumerException;
-import org.apache.pinot.core.realtime.stream.StreamConfig;
-import org.apache.pinot.core.realtime.stream.TransientConsumerException;
+import org.apache.pinot.spi.stream.PermanentConsumerException;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.TransientConsumerException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git 
a/pinot-connectors/pinot-connector-kafka-0.9/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaConsumerFactory.java
 
b/pinot-connectors/pinot-connector-kafka-0.9/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaConsumerFactory.java
index cc2350c..b57cc8a 100644
--- 
a/pinot-connectors/pinot-connector-kafka-0.9/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaConsumerFactory.java
+++ 
b/pinot-connectors/pinot-connector-kafka-0.9/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaConsumerFactory.java
@@ -19,10 +19,10 @@
 package org.apache.pinot.core.realtime.impl.kafka;
 
 import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.core.realtime.stream.PartitionLevelConsumer;
-import org.apache.pinot.core.realtime.stream.StreamConsumerFactory;
-import org.apache.pinot.core.realtime.stream.StreamLevelConsumer;
-import org.apache.pinot.core.realtime.stream.StreamMetadataProvider;
+import org.apache.pinot.spi.stream.PartitionLevelConsumer;
+import org.apache.pinot.spi.stream.StreamConsumerFactory;
+import org.apache.pinot.spi.stream.StreamLevelConsumer;
+import org.apache.pinot.spi.stream.StreamMetadataProvider;
 
 
 /**
diff --git 
a/pinot-connectors/pinot-connector-kafka-0.9/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaHighLevelStreamConfig.java
 
b/pinot-connectors/pinot-connector-kafka-0.9/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaHighLevelStreamConfig.java
index 0fa235e..9ae9377 100644
--- 
a/pinot-connectors/pinot-connector-kafka-0.9/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaHighLevelStreamConfig.java
+++ 
b/pinot-connectors/pinot-connector-kafka-0.9/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaHighLevelStreamConfig.java
@@ -24,8 +24,8 @@ import java.util.Map;
 import java.util.Properties;
 import kafka.consumer.ConsumerConfig;
 import org.apache.pinot.spi.utils.EqualityUtils;
-import org.apache.pinot.core.realtime.stream.StreamConfig;
-import org.apache.pinot.core.realtime.stream.StreamConfigProperties;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamConfigProperties;
 
 
 /**
diff --git 
a/pinot-connectors/pinot-connector-kafka-0.9/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaLowLevelStreamConfig.java
 
b/pinot-connectors/pinot-connector-kafka-0.9/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaLowLevelStreamConfig.java
index 0bf2b73..e5bbee1 100644
--- 
a/pinot-connectors/pinot-connector-kafka-0.9/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaLowLevelStreamConfig.java
+++ 
b/pinot-connectors/pinot-connector-kafka-0.9/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaLowLevelStreamConfig.java
@@ -22,7 +22,7 @@ import com.google.common.base.Preconditions;
 import java.util.Map;
 import org.apache.commons.lang.StringUtils;
 import org.apache.pinot.spi.utils.EqualityUtils;
-import org.apache.pinot.core.realtime.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamConfig;
 
 
 /**
diff --git 
a/pinot-connectors/pinot-connector-kafka-0.9/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaPartitionLevelConsumer.java
 
b/pinot-connectors/pinot-connector-kafka-0.9/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaPartitionLevelConsumer.java
index 003153c..58332ae 100644
--- 
a/pinot-connectors/pinot-connector-kafka-0.9/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaPartitionLevelConsumer.java
+++ 
b/pinot-connectors/pinot-connector-kafka-0.9/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaPartitionLevelConsumer.java
@@ -25,9 +25,9 @@ import kafka.api.FetchRequestBuilder;
 import kafka.javaapi.FetchResponse;
 import kafka.javaapi.message.ByteBufferMessageSet;
 import kafka.message.MessageAndOffset;
-import org.apache.pinot.core.realtime.stream.MessageBatch;
-import org.apache.pinot.core.realtime.stream.PartitionLevelConsumer;
-import org.apache.pinot.core.realtime.stream.StreamConfig;
+import org.apache.pinot.spi.stream.MessageBatch;
+import org.apache.pinot.spi.stream.PartitionLevelConsumer;
+import org.apache.pinot.spi.stream.StreamConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git 
a/pinot-connectors/pinot-connector-kafka-0.9/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaStreamLevelConsumer.java
 
b/pinot-connectors/pinot-connector-kafka-0.9/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaStreamLevelConsumer.java
index 92f6d45..c628ac6 100644
--- 
a/pinot-connectors/pinot-connector-kafka-0.9/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaStreamLevelConsumer.java
+++ 
b/pinot-connectors/pinot-connector-kafka-0.9/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaStreamLevelConsumer.java
@@ -20,10 +20,10 @@ package org.apache.pinot.core.realtime.impl.kafka;
 
 import kafka.consumer.ConsumerIterator;
 import kafka.javaapi.consumer.ConsumerConnector;
-import org.apache.pinot.core.realtime.stream.StreamConfig;
-import org.apache.pinot.core.realtime.stream.StreamDecoderProvider;
-import org.apache.pinot.core.realtime.stream.StreamLevelConsumer;
-import org.apache.pinot.core.realtime.stream.StreamMessageDecoder;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamDecoderProvider;
+import org.apache.pinot.spi.stream.StreamLevelConsumer;
+import org.apache.pinot.spi.stream.StreamMessageDecoder;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
 import org.slf4j.Logger;
diff --git 
a/pinot-connectors/pinot-connector-kafka-0.9/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaStreamMetadataProvider.java
 
b/pinot-connectors/pinot-connector-kafka-0.9/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaStreamMetadataProvider.java
index a4b9bfb..abd4e7d 100644
--- 
a/pinot-connectors/pinot-connector-kafka-0.9/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaStreamMetadataProvider.java
+++ 
b/pinot-connectors/pinot-connector-kafka-0.9/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaStreamMetadataProvider.java
@@ -34,9 +34,9 @@ import kafka.javaapi.TopicMetadataRequest;
 import kafka.javaapi.TopicMetadataResponse;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.pinot.core.realtime.stream.OffsetCriteria;
-import org.apache.pinot.core.realtime.stream.StreamConfig;
-import org.apache.pinot.core.realtime.stream.StreamMetadataProvider;
+import org.apache.pinot.spi.stream.OffsetCriteria;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamMetadataProvider;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git 
a/pinot-connectors/pinot-connector-kafka-0.9/src/main/java/org/apache/pinot/core/realtime/impl/kafka/SimpleConsumerMessageBatch.java
 
b/pinot-connectors/pinot-connector-kafka-0.9/src/main/java/org/apache/pinot/core/realtime/impl/kafka/SimpleConsumerMessageBatch.java
index ddb39cc..4159d0f 100644
--- 
a/pinot-connectors/pinot-connector-kafka-0.9/src/main/java/org/apache/pinot/core/realtime/impl/kafka/SimpleConsumerMessageBatch.java
+++ 
b/pinot-connectors/pinot-connector-kafka-0.9/src/main/java/org/apache/pinot/core/realtime/impl/kafka/SimpleConsumerMessageBatch.java
@@ -20,7 +20,7 @@ package org.apache.pinot.core.realtime.impl.kafka;
 
 import java.util.ArrayList;
 import kafka.message.MessageAndOffset;
-import org.apache.pinot.core.realtime.stream.MessageBatch;
+import org.apache.pinot.spi.stream.MessageBatch;
 
 
 public class SimpleConsumerMessageBatch implements MessageBatch<byte[]> {
diff --git 
a/pinot-connectors/pinot-connector-kafka-0.9/src/main/java/org/apache/pinot/core/realtime/impl/kafka/server/KafkaDataProducer.java
 
b/pinot-connectors/pinot-connector-kafka-0.9/src/main/java/org/apache/pinot/core/realtime/impl/kafka/server/KafkaDataProducer.java
index 0eb4ac6..8afcf9a 100644
--- 
a/pinot-connectors/pinot-connector-kafka-0.9/src/main/java/org/apache/pinot/core/realtime/impl/kafka/server/KafkaDataProducer.java
+++ 
b/pinot-connectors/pinot-connector-kafka-0.9/src/main/java/org/apache/pinot/core/realtime/impl/kafka/server/KafkaDataProducer.java
@@ -22,7 +22,7 @@ import java.util.Properties;
 import kafka.javaapi.producer.Producer;
 import kafka.producer.KeyedMessage;
 import kafka.producer.ProducerConfig;
-import org.apache.pinot.core.realtime.stream.StreamDataProducer;
+import org.apache.pinot.spi.stream.StreamDataProducer;
 
 
 public class KafkaDataProducer implements StreamDataProducer {
diff --git 
a/pinot-connectors/pinot-connector-kafka-0.9/src/main/java/org/apache/pinot/core/realtime/impl/kafka/server/KafkaDataServerStartable.java
 
b/pinot-connectors/pinot-connector-kafka-0.9/src/main/java/org/apache/pinot/core/realtime/impl/kafka/server/KafkaDataServerStartable.java
index 1c2a8ff..d8da7f0 100644
--- 
a/pinot-connectors/pinot-connector-kafka-0.9/src/main/java/org/apache/pinot/core/realtime/impl/kafka/server/KafkaDataServerStartable.java
+++ 
b/pinot-connectors/pinot-connector-kafka-0.9/src/main/java/org/apache/pinot/core/realtime/impl/kafka/server/KafkaDataServerStartable.java
@@ -26,7 +26,7 @@ import kafka.server.KafkaConfig;
 import kafka.server.KafkaServerStartable;
 import org.I0Itec.zkclient.ZkClient;
 import org.apache.commons.io.FileUtils;
-import org.apache.pinot.core.realtime.stream.StreamDataServerStartable;
+import org.apache.pinot.spi.stream.StreamDataServerStartable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git 
a/pinot-connectors/pinot-connector-kafka-0.9/src/main/resources/META-INF/services/org.apache.pinot.core.realtime.stream.StreamConsumerFactory
 
b/pinot-connectors/pinot-connector-kafka-0.9/src/main/resources/META-INF/services/org.apache.pinot.spi.stream.StreamConsumerFactory
similarity index 100%
rename from 
pinot-connectors/pinot-connector-kafka-0.9/src/main/resources/META-INF/services/org.apache.pinot.core.realtime.stream.StreamConsumerFactory
rename to 
pinot-connectors/pinot-connector-kafka-0.9/src/main/resources/META-INF/services/org.apache.pinot.spi.stream.StreamConsumerFactory
diff --git 
a/pinot-connectors/pinot-connector-kafka-0.9/src/test/java/org/apache/pinot/core/realtime/impl/kafka/KafkaLowLevelStreamConfigTest.java
 
b/pinot-connectors/pinot-connector-kafka-0.9/src/test/java/org/apache/pinot/core/realtime/impl/kafka/KafkaLowLevelStreamConfigTest.java
index d224d4d..dd90165 100644
--- 
a/pinot-connectors/pinot-connector-kafka-0.9/src/test/java/org/apache/pinot/core/realtime/impl/kafka/KafkaLowLevelStreamConfigTest.java
+++ 
b/pinot-connectors/pinot-connector-kafka-0.9/src/test/java/org/apache/pinot/core/realtime/impl/kafka/KafkaLowLevelStreamConfigTest.java
@@ -20,8 +20,8 @@ package org.apache.pinot.core.realtime.impl.kafka;
 
 import java.util.HashMap;
 import java.util.Map;
-import org.apache.pinot.core.realtime.stream.StreamConfig;
-import org.apache.pinot.core.realtime.stream.StreamConfigProperties;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamConfigProperties;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
diff --git 
a/pinot-connectors/pinot-connector-kafka-0.9/src/test/java/org/apache/pinot/core/realtime/impl/kafka/KafkaPartitionLevelConsumerTest.java
 
b/pinot-connectors/pinot-connector-kafka-0.9/src/test/java/org/apache/pinot/core/realtime/impl/kafka/KafkaPartitionLevelConsumerTest.java
index 9b6b950..10e0a99 100644
--- 
a/pinot-connectors/pinot-connector-kafka-0.9/src/test/java/org/apache/pinot/core/realtime/impl/kafka/KafkaPartitionLevelConsumerTest.java
+++ 
b/pinot-connectors/pinot-connector-kafka-0.9/src/test/java/org/apache/pinot/core/realtime/impl/kafka/KafkaPartitionLevelConsumerTest.java
@@ -37,12 +37,8 @@ import kafka.javaapi.consumer.SimpleConsumer;
 import kafka.javaapi.message.ByteBufferMessageSet;
 import kafka.message.Message;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.pinot.core.realtime.impl.kafka.KafkaPartitionLevelConsumer;
-import org.apache.pinot.core.realtime.impl.kafka.KafkaSimpleConsumerFactory;
-import org.apache.pinot.core.realtime.impl.kafka.KafkaStreamConfigProperties;
-import org.apache.pinot.core.realtime.impl.kafka.KafkaStreamMetadataProvider;
-import org.apache.pinot.core.realtime.stream.OffsetCriteria;
-import org.apache.pinot.core.realtime.stream.StreamConfig;
+import org.apache.pinot.spi.stream.OffsetCriteria;
+import org.apache.pinot.spi.stream.StreamConfig;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 import scala.Some;
diff --git 
a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaConsumerFactory.java
 
b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaConsumerFactory.java
index 3afee0d..06aa33e 100644
--- 
a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaConsumerFactory.java
+++ 
b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaConsumerFactory.java
@@ -19,10 +19,10 @@
 package org.apache.pinot.core.realtime.impl.kafka2;
 
 import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.core.realtime.stream.PartitionLevelConsumer;
-import org.apache.pinot.core.realtime.stream.StreamConsumerFactory;
-import org.apache.pinot.core.realtime.stream.StreamLevelConsumer;
-import org.apache.pinot.core.realtime.stream.StreamMetadataProvider;
+import org.apache.pinot.spi.stream.PartitionLevelConsumer;
+import org.apache.pinot.spi.stream.StreamConsumerFactory;
+import org.apache.pinot.spi.stream.StreamLevelConsumer;
+import org.apache.pinot.spi.stream.StreamMetadataProvider;
 
 
 public class KafkaConsumerFactory extends StreamConsumerFactory {
diff --git 
a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaMessageBatch.java
 
b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaMessageBatch.java
index abd801b..511fd90 100644
--- 
a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaMessageBatch.java
+++ 
b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaMessageBatch.java
@@ -23,7 +23,7 @@ import java.util.List;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.pinot.core.realtime.impl.kafka.MessageAndOffset;
-import org.apache.pinot.core.realtime.stream.MessageBatch;
+import org.apache.pinot.spi.stream.MessageBatch;
 
 
 public class KafkaMessageBatch implements MessageBatch<byte[]> {
diff --git 
a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaPartitionLevelConnectionHandler.java
 
b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaPartitionLevelConnectionHandler.java
index 0a80a30..811d1c4 100644
--- 
a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaPartitionLevelConnectionHandler.java
+++ 
b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaPartitionLevelConnectionHandler.java
@@ -29,7 +29,7 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.BytesDeserializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.utils.Bytes;
-import org.apache.pinot.core.realtime.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamConfig;
 
 
 /**
diff --git 
a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaPartitionLevelConsumer.java
 
b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaPartitionLevelConsumer.java
index 014ff88..25abec7 100644
--- 
a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaPartitionLevelConsumer.java
+++ 
b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaPartitionLevelConsumer.java
@@ -26,9 +26,9 @@ import java.util.concurrent.TimeoutException;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.common.utils.Bytes;
-import org.apache.pinot.core.realtime.stream.MessageBatch;
-import org.apache.pinot.core.realtime.stream.PartitionLevelConsumer;
-import org.apache.pinot.core.realtime.stream.StreamConfig;
+import org.apache.pinot.spi.stream.MessageBatch;
+import org.apache.pinot.spi.stream.PartitionLevelConsumer;
+import org.apache.pinot.spi.stream.StreamConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git 
a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaPartitionLevelStreamConfig.java
 
b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaPartitionLevelStreamConfig.java
index 7501904..be9001c 100644
--- 
a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaPartitionLevelStreamConfig.java
+++ 
b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaPartitionLevelStreamConfig.java
@@ -23,7 +23,7 @@ import java.util.Map;
 import org.apache.commons.lang.StringUtils;
 import org.apache.pinot.spi.utils.EqualityUtils;
 import org.apache.pinot.core.realtime.impl.kafka.KafkaStreamConfigProperties;
-import org.apache.pinot.core.realtime.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamConfig;
 
 
 /**
diff --git 
a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaStreamLevelConsumer.java
 
b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaStreamLevelConsumer.java
index c06656e..33d6ef6 100644
--- 
a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaStreamLevelConsumer.java
+++ 
b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaStreamLevelConsumer.java
@@ -30,10 +30,10 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
-import org.apache.pinot.core.realtime.stream.StreamConfig;
-import org.apache.pinot.core.realtime.stream.StreamDecoderProvider;
-import org.apache.pinot.core.realtime.stream.StreamLevelConsumer;
-import org.apache.pinot.core.realtime.stream.StreamMessageDecoder;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamDecoderProvider;
+import org.apache.pinot.spi.stream.StreamLevelConsumer;
+import org.apache.pinot.spi.stream.StreamMessageDecoder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git 
a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaStreamLevelStreamConfig.java
 
b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaStreamLevelStreamConfig.java
index e04cce3..d5d556f 100644
--- 
a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaStreamLevelStreamConfig.java
+++ 
b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaStreamLevelStreamConfig.java
@@ -25,8 +25,8 @@ import java.util.Properties;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.pinot.spi.utils.EqualityUtils;
 import org.apache.pinot.core.realtime.impl.kafka.KafkaStreamConfigProperties;
-import org.apache.pinot.core.realtime.stream.StreamConfig;
-import org.apache.pinot.core.realtime.stream.StreamConfigProperties;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamConfigProperties;
 
 
 /**
diff --git 
a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaStreamMetadataProvider.java
 
b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaStreamMetadataProvider.java
index 7d106ed..de3570b 100644
--- 
a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaStreamMetadataProvider.java
+++ 
b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaStreamMetadataProvider.java
@@ -24,9 +24,9 @@ import java.time.Duration;
 import java.util.Collections;
 import java.util.concurrent.TimeoutException;
 import javax.annotation.Nonnull;
-import org.apache.pinot.core.realtime.stream.OffsetCriteria;
-import org.apache.pinot.core.realtime.stream.StreamConfig;
-import org.apache.pinot.core.realtime.stream.StreamMetadataProvider;
+import org.apache.pinot.spi.stream.OffsetCriteria;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamMetadataProvider;
 
 
 public class KafkaStreamMetadataProvider extends 
KafkaPartitionLevelConnectionHandler implements StreamMetadataProvider {
diff --git 
a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/server/KafkaDataProducer.java
 
b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/server/KafkaDataProducer.java
index eebd89729..c5c8ed3 100644
--- 
a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/server/KafkaDataProducer.java
+++ 
b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/server/KafkaDataProducer.java
@@ -22,7 +22,7 @@ import java.util.Properties;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.pinot.core.realtime.stream.StreamDataProducer;
+import org.apache.pinot.spi.stream.StreamDataProducer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git 
a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/server/KafkaDataServerStartable.java
 
b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/server/KafkaDataServerStartable.java
index c666b96..042b8be 100644
--- 
a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/server/KafkaDataServerStartable.java
+++ 
b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/server/KafkaDataServerStartable.java
@@ -33,7 +33,7 @@ import org.apache.kafka.clients.admin.AdminClient;
 import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.admin.KafkaAdminClient;
 import org.apache.kafka.clients.admin.NewTopic;
-import org.apache.pinot.core.realtime.stream.StreamDataServerStartable;
+import org.apache.pinot.spi.stream.StreamDataServerStartable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git 
a/pinot-connectors/pinot-connector-kafka-2.0/src/main/resources/META-INF/services/org.apache.pinot.core.realtime.stream.StreamConsumerFactory
 
b/pinot-connectors/pinot-connector-kafka-2.0/src/main/resources/META-INF/services/org.apache.pinot.spi.stream.StreamConsumerFactory
similarity index 100%
rename from 
pinot-connectors/pinot-connector-kafka-2.0/src/main/resources/META-INF/services/org.apache.pinot.core.realtime.stream.StreamConsumerFactory
rename to 
pinot-connectors/pinot-connector-kafka-2.0/src/main/resources/META-INF/services/org.apache.pinot.spi.stream.StreamConsumerFactory
diff --git 
a/pinot-connectors/pinot-connector-kafka-2.0/src/test/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaPartitionLevelConsumerTest.java
 
b/pinot-connectors/pinot-connector-kafka-2.0/src/test/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaPartitionLevelConsumerTest.java
index 1eb6c64..d616b5b 100644
--- 
a/pinot-connectors/pinot-connector-kafka-2.0/src/test/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaPartitionLevelConsumerTest.java
+++ 
b/pinot-connectors/pinot-connector-kafka-2.0/src/test/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaPartitionLevelConsumerTest.java
@@ -28,12 +28,12 @@ import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.pinot.core.realtime.impl.kafka.KafkaStreamConfigProperties;
 import org.apache.pinot.core.realtime.impl.kafka2.utils.MiniKafkaCluster;
-import org.apache.pinot.core.realtime.stream.MessageBatch;
-import org.apache.pinot.core.realtime.stream.OffsetCriteria;
-import org.apache.pinot.core.realtime.stream.PartitionLevelConsumer;
-import org.apache.pinot.core.realtime.stream.StreamConfig;
-import org.apache.pinot.core.realtime.stream.StreamConsumerFactory;
-import org.apache.pinot.core.realtime.stream.StreamConsumerFactoryProvider;
+import org.apache.pinot.spi.stream.MessageBatch;
+import org.apache.pinot.spi.stream.OffsetCriteria;
+import org.apache.pinot.spi.stream.PartitionLevelConsumer;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamConsumerFactory;
+import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
diff --git 
a/pinot-connectors/pinot-connector-kafka-2.0/src/test/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaPartitionLevelStreamConfigTest.java
 
b/pinot-connectors/pinot-connector-kafka-2.0/src/test/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaPartitionLevelStreamConfigTest.java
index 8358f98..03bbadc 100644
--- 
a/pinot-connectors/pinot-connector-kafka-2.0/src/test/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaPartitionLevelStreamConfigTest.java
+++ 
b/pinot-connectors/pinot-connector-kafka-2.0/src/test/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaPartitionLevelStreamConfigTest.java
@@ -21,8 +21,8 @@ package org.apache.pinot.core.realtime.impl.kafka2;
 import java.util.HashMap;
 import java.util.Map;
 import org.apache.pinot.core.realtime.impl.kafka.KafkaStreamConfigProperties;
-import org.apache.pinot.core.realtime.stream.StreamConfig;
-import org.apache.pinot.core.realtime.stream.StreamConfigProperties;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamConfigProperties;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
diff --git 
a/pinot-connectors/pinot-connector-kafka-base/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaAvroMessageDecoder.java
 
b/pinot-connectors/pinot-connector-kafka-base/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaAvroMessageDecoder.java
index e240c1b..ed365cb 100644
--- 
a/pinot-connectors/pinot-connector-kafka-base/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaAvroMessageDecoder.java
+++ 
b/pinot-connectors/pinot-connector-kafka-base/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaAvroMessageDecoder.java
@@ -42,7 +42,7 @@ import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.common.utils.retry.RetryPolicies;
 import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.core.realtime.stream.AvroRecordToPinotRowGenerator;
-import org.apache.pinot.core.realtime.stream.StreamMessageDecoder;
+import org.apache.pinot.spi.stream.StreamMessageDecoder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git 
a/pinot-connectors/pinot-connector-kafka-base/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaJSONMessageDecoder.java
 
b/pinot-connectors/pinot-connector-kafka-base/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaJSONMessageDecoder.java
index 5f9d463..8d6812d 100644
--- 
a/pinot-connectors/pinot-connector-kafka-base/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaJSONMessageDecoder.java
+++ 
b/pinot-connectors/pinot-connector-kafka-base/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaJSONMessageDecoder.java
@@ -26,7 +26,7 @@ import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.TimeFieldSpec;
 import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.spi.data.readers.GenericRow;
-import org.apache.pinot.core.realtime.stream.StreamMessageDecoder;
+import org.apache.pinot.spi.stream.StreamMessageDecoder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git 
a/pinot-connectors/pinot-connector-kafka-base/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaStarterUtils.java
 
b/pinot-connectors/pinot-connector-kafka-base/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaStarterUtils.java
index 4881caf..efd8374 100644
--- 
a/pinot-connectors/pinot-connector-kafka-base/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaStarterUtils.java
+++ 
b/pinot-connectors/pinot-connector-kafka-base/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaStarterUtils.java
@@ -23,9 +23,9 @@ import java.util.List;
 import java.util.Properties;
 import java.util.ServiceLoader;
 import org.apache.pinot.common.utils.ZkStarter;
-import org.apache.pinot.core.realtime.stream.StreamConsumerFactory;
-import org.apache.pinot.core.realtime.stream.StreamDataProvider;
-import org.apache.pinot.core.realtime.stream.StreamDataServerStartable;
+import org.apache.pinot.spi.stream.StreamConsumerFactory;
+import org.apache.pinot.spi.stream.StreamDataProvider;
+import org.apache.pinot.spi.stream.StreamDataServerStartable;
 
 
 public class KafkaStarterUtils {
diff --git 
a/pinot-connectors/pinot-connector-kafka-base/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaStreamConfigProperties.java
 
b/pinot-connectors/pinot-connector-kafka-base/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaStreamConfigProperties.java
index 03182ac..2337d61 100644
--- 
a/pinot-connectors/pinot-connector-kafka-base/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaStreamConfigProperties.java
+++ 
b/pinot-connectors/pinot-connector-kafka-base/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaStreamConfigProperties.java
@@ -19,7 +19,7 @@
 package org.apache.pinot.core.realtime.impl.kafka;
 
 import com.google.common.base.Joiner;
-import org.apache.pinot.core.realtime.stream.StreamConfigProperties;
+import org.apache.pinot.spi.stream.StreamConfigProperties;
 
 
 /**
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/SegmentValidator.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/SegmentValidator.java
index db071a4..3f72320 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/SegmentValidator.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/SegmentValidator.java
@@ -27,7 +27,7 @@ import 
org.apache.pinot.common.exception.InvalidConfigException;
 import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.metrics.ControllerMetrics;
 import org.apache.pinot.common.segment.SegmentMetadata;
-import org.apache.pinot.common.utils.time.TimeUtils;
+import org.apache.pinot.spi.utils.TimeUtils;
 import org.apache.pinot.controller.ControllerConf;
 import 
org.apache.pinot.controller.api.resources.ControllerApplicationException;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index b6f3c2a..1477396 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -104,7 +104,7 @@ import 
org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
 import org.apache.pinot.controller.helix.core.rebalance.TableRebalancer;
 import org.apache.pinot.controller.helix.core.util.ZKMetadataUtils;
 import org.apache.pinot.controller.helix.starter.HelixConfig;
-import org.apache.pinot.core.realtime.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamConfig;
 import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -1150,7 +1150,8 @@ public class PinotHelixResourceManager {
 
   private void ensureRealtimeClusterIsSetUp(TableConfig realtimeTableConfig) {
     String realtimeTableName = realtimeTableConfig.getTableName();
-    StreamConfig streamConfig = new StreamConfig(realtimeTableConfig);
+    StreamConfig streamConfig = new 
StreamConfig(realtimeTableConfig.getTableName(),
+        realtimeTableConfig.getIndexingConfig().getStreamConfigs());
     IdealState idealState = getTableIdealState(realtimeTableName);
 
     if (streamConfig.hasHighLevelConsumerType()) {
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
index 042c27f..892c24c 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
@@ -33,8 +33,8 @@ import org.apache.pinot.common.utils.StringUtil;
 import org.apache.pinot.common.utils.helix.HelixHelper;
 import org.apache.pinot.common.utils.retry.RetryPolicies;
 import 
org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
-import org.apache.pinot.core.realtime.stream.PartitionCountFetcher;
-import org.apache.pinot.core.realtime.stream.StreamConfig;
+import org.apache.pinot.spi.stream.PartitionCountFetcher;
+import org.apache.pinot.spi.stream.StreamConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index e4687da..7ffd35a 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -72,11 +72,11 @@ import 
org.apache.pinot.controller.helix.core.realtime.segment.CommittingSegment
 import 
org.apache.pinot.controller.helix.core.realtime.segment.FlushThresholdUpdateManager;
 import 
org.apache.pinot.controller.helix.core.realtime.segment.FlushThresholdUpdater;
 import org.apache.pinot.controller.util.SegmentCompletionUtils;
-import org.apache.pinot.core.realtime.stream.OffsetCriteria;
-import org.apache.pinot.core.realtime.stream.PartitionLevelStreamConfig;
-import org.apache.pinot.core.realtime.stream.PartitionOffsetFetcher;
-import org.apache.pinot.core.realtime.stream.StreamConfig;
-import org.apache.pinot.core.realtime.stream.StreamConfigProperties;
+import org.apache.pinot.spi.stream.OffsetCriteria;
+import org.apache.pinot.spi.stream.PartitionLevelStreamConfig;
+import org.apache.pinot.spi.stream.PartitionOffsetFetcher;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamConfigProperties;
 import org.apache.pinot.core.segment.index.SegmentMetadataImpl;
 import org.apache.pinot.spi.filesystem.PinotFS;
 import org.apache.pinot.filesystem.PinotFSFactory;
@@ -206,7 +206,8 @@ public class PinotLLCRealtimeSegmentManager {
 
     _flushThresholdUpdateManager.clearFlushThresholdUpdater(realtimeTableName);
 
-    PartitionLevelStreamConfig streamConfig = new 
PartitionLevelStreamConfig(tableConfig);
+    PartitionLevelStreamConfig streamConfig =
+        new PartitionLevelStreamConfig(tableConfig.getTableName(), 
tableConfig.getIndexingConfig().getStreamConfigs());
     InstancePartitions instancePartitions = 
getConsumingInstancePartitions(tableConfig);
     int numPartitions = getNumPartitions(streamConfig);
     int numReplicas = getNumReplicas(tableConfig, instancePartitions);
@@ -432,9 +433,10 @@ public class PinotLLCRealtimeSegmentManager {
     long newSegmentCreationTimeMs = getCurrentTimeMs();
     LLCSegmentName newLLCSegmentName =
         getNextLLCSegmentName(new LLCSegmentName(committingSegmentName), 
newSegmentCreationTimeMs);
-    createNewSegmentZKMetadata(tableConfig, new 
PartitionLevelStreamConfig(tableConfig), newLLCSegmentName,
-        newSegmentCreationTimeMs, committingSegmentDescriptor, 
committingSegmentZKMetadata, instancePartitions,
-        numPartitions, numReplicas);
+    createNewSegmentZKMetadata(tableConfig,
+        new PartitionLevelStreamConfig(tableConfig.getTableName(), 
tableConfig.getIndexingConfig().getStreamConfigs()),
+        newLLCSegmentName, newSegmentCreationTimeMs, 
committingSegmentDescriptor, committingSegmentZKMetadata,
+        instancePartitions, numPartitions, numReplicas);
 
     // Step-3
     SegmentAssignment segmentAssignment = 
SegmentAssignmentFactory.getSegmentAssignment(_helixManager, tableConfig);
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotRealtimeSegmentManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotRealtimeSegmentManager.java
index 5a8f542..10d7f7f 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotRealtimeSegmentManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotRealtimeSegmentManager.java
@@ -56,7 +56,7 @@ import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.helix.core.PinotTableIdealStateBuilder;
 import org.apache.pinot.core.query.utils.Pair;
-import org.apache.pinot.core.realtime.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamConfig;
 import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/DefaultFlushThresholdUpdater.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/DefaultFlushThresholdUpdater.java
index e307850..5a68497 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/DefaultFlushThresholdUpdater.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/DefaultFlushThresholdUpdater.java
@@ -21,7 +21,7 @@ package 
org.apache.pinot.controller.helix.core.realtime.segment;
 import com.google.common.annotations.VisibleForTesting;
 import javax.annotation.Nullable;
 import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata;
-import org.apache.pinot.core.realtime.stream.PartitionLevelStreamConfig;
+import org.apache.pinot.spi.stream.PartitionLevelStreamConfig;
 
 
 /**
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdateManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdateManager.java
index 9f4ec2e..a166301 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdateManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdateManager.java
@@ -20,7 +20,7 @@ package 
org.apache.pinot.controller.helix.core.realtime.segment;
 
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import org.apache.pinot.core.realtime.stream.PartitionLevelStreamConfig;
+import org.apache.pinot.spi.stream.PartitionLevelStreamConfig;
 
 
 /**
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdater.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdater.java
index b005f47..544975a 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdater.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdater.java
@@ -20,7 +20,7 @@ package 
org.apache.pinot.controller.helix.core.realtime.segment;
 
 import javax.annotation.Nullable;
 import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata;
-import org.apache.pinot.core.realtime.stream.PartitionLevelStreamConfig;
+import org.apache.pinot.spi.stream.PartitionLevelStreamConfig;
 
 
 /**
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentSizeBasedFlushThresholdUpdater.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentSizeBasedFlushThresholdUpdater.java
index 12e6093..883aae2 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentSizeBasedFlushThresholdUpdater.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentSizeBasedFlushThresholdUpdater.java
@@ -22,8 +22,8 @@ import com.google.common.annotations.VisibleForTesting;
 import javax.annotation.Nullable;
 import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata;
 import org.apache.pinot.common.utils.LLCSegmentName;
-import org.apache.pinot.common.utils.time.TimeUtils;
-import org.apache.pinot.core.realtime.stream.PartitionLevelStreamConfig;
+import org.apache.pinot.spi.utils.TimeUtils;
+import org.apache.pinot.spi.stream.PartitionLevelStreamConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
index 3483deb..0ddbe6b 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
@@ -42,7 +42,7 @@ import 
org.apache.pinot.controller.helix.core.assignment.instance.InstanceAssign
 import 
org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignment;
 import 
org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentFactory;
 import 
org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentUtils;
-import org.apache.pinot.core.realtime.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java
index 90993da..80e4e40 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java
@@ -29,7 +29,7 @@ import org.apache.pinot.common.config.TableNameBuilder;
 import org.apache.pinot.common.config.TagNameUtils;
 import org.apache.pinot.common.config.instance.InstanceAssignmentConfig;
 import org.apache.pinot.common.metrics.ControllerMetrics;
-import org.apache.pinot.common.utils.time.TimeUtils;
+import org.apache.pinot.spi.utils.TimeUtils;
 import org.apache.pinot.controller.ControllerConf;
 import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
@@ -37,7 +37,7 @@ import 
org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTas
 import 
org.apache.pinot.controller.helix.core.rebalance.RebalanceConfigConstants;
 import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
 import org.apache.pinot.controller.helix.core.rebalance.TableRebalancer;
-import org.apache.pinot.core.realtime.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/strategy/TimeRetentionStrategy.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/strategy/TimeRetentionStrategy.java
index 167e030..2690d67 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/strategy/TimeRetentionStrategy.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/strategy/TimeRetentionStrategy.java
@@ -20,7 +20,7 @@ package 
org.apache.pinot.controller.helix.core.retention.strategy;
 
 import java.util.concurrent.TimeUnit;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
-import org.apache.pinot.common.utils.time.TimeUtils;
+import org.apache.pinot.spi.utils.TimeUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableRetentionValidator.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableRetentionValidator.java
index 0c32518..1b68e20 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableRetentionValidator.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableRetentionValidator.java
@@ -34,7 +34,7 @@ import org.apache.pinot.common.config.TableConfig;
 import org.apache.pinot.common.config.TableNameBuilder;
 import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
-import org.apache.pinot.common.utils.time.TimeUtils;
+import org.apache.pinot.spi.utils.TimeUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentIntervalChecker.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentIntervalChecker.java
index 9a43732..3fcdefc 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentIntervalChecker.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentIntervalChecker.java
@@ -29,7 +29,7 @@ import 
org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
 import org.apache.pinot.common.metrics.ControllerMetrics;
 import org.apache.pinot.common.metrics.ValidationMetrics;
 import org.apache.pinot.common.utils.CommonConstants;
-import org.apache.pinot.common.utils.time.TimeUtils;
+import org.apache.pinot.spi.utils.TimeUtils;
 import org.apache.pinot.controller.ControllerConf;
 import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
index 724c6de..8acca80 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
@@ -21,7 +21,6 @@ package org.apache.pinot.controller.validation;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import org.apache.pinot.common.config.TableConfig;
 import org.apache.pinot.common.config.TableNameBuilder;
@@ -36,8 +35,8 @@ import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import 
org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
 import 
org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
-import org.apache.pinot.core.realtime.stream.PartitionLevelStreamConfig;
-import org.apache.pinot.core.realtime.stream.StreamConfig;
+import org.apache.pinot.spi.stream.PartitionLevelStreamConfig;
+import org.apache.pinot.spi.stream.StreamConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -96,7 +95,8 @@ public class RealtimeSegmentValidationManager extends 
ControllerPeriodicTask<Rea
         updateRealtimeDocumentCount(tableConfig);
       }
 
-      PartitionLevelStreamConfig streamConfig = new 
PartitionLevelStreamConfig(tableConfig);
+      PartitionLevelStreamConfig streamConfig = new 
PartitionLevelStreamConfig(tableConfig.getTableName(),
+          tableConfig.getIndexingConfig().getStreamConfigs());
       if (streamConfig.hasLowLevelConsumerType()) {
         _llcRealtimeSegmentManager.ensureAllPartitionsConsuming(tableConfig, 
streamConfig);
       }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/StorageQuotaChecker.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/StorageQuotaChecker.java
index 6a3066c..65b77ab 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/StorageQuotaChecker.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/StorageQuotaChecker.java
@@ -27,7 +27,7 @@ import org.apache.pinot.common.config.TableConfig;
 import org.apache.pinot.common.exception.InvalidConfigException;
 import org.apache.pinot.common.metrics.ControllerGauge;
 import org.apache.pinot.common.metrics.ControllerMetrics;
-import org.apache.pinot.common.utils.DataSize;
+import org.apache.pinot.spi.utils.DataSize;
 import org.apache.pinot.controller.util.TableSizeReader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotTableRestletResourceTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotTableRestletResourceTest.java
index f03977e..8e77387 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotTableRestletResourceTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotTableRestletResourceTest.java
@@ -30,7 +30,7 @@ import org.apache.pinot.controller.ControllerConf;
 import org.apache.pinot.controller.helix.ControllerTest;
 import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
 import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils;
-import org.apache.pinot.core.realtime.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamConfig;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/TableViewsTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/TableViewsTest.java
index 9facd20..0aa2576 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/TableViewsTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/TableViewsTest.java
@@ -30,7 +30,7 @@ import org.apache.pinot.controller.api.resources.TableViews;
 import org.apache.pinot.controller.helix.ControllerTest;
 import org.apache.pinot.controller.utils.SegmentMetadataMockUtils;
 import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils;
-import org.apache.pinot.core.realtime.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamConfig;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index 451ccbe..6e03b4d 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -53,9 +53,9 @@ import 
org.apache.pinot.controller.helix.core.realtime.segment.CommittingSegment
 import org.apache.pinot.controller.util.SegmentCompletionUtils;
 import org.apache.pinot.core.indexsegment.generator.SegmentVersion;
 import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils;
-import org.apache.pinot.core.realtime.stream.OffsetCriteria;
-import org.apache.pinot.core.realtime.stream.PartitionLevelStreamConfig;
-import org.apache.pinot.core.realtime.stream.StreamConfig;
+import org.apache.pinot.spi.stream.OffsetCriteria;
+import org.apache.pinot.spi.stream.PartitionLevelStreamConfig;
+import org.apache.pinot.spi.stream.StreamConfig;
 import org.apache.pinot.core.segment.index.SegmentMetadataImpl;
 import org.apache.pinot.filesystem.PinotFSFactory;
 import org.apache.zookeeper.data.Stat;
@@ -797,7 +797,8 @@ public class PinotLLCRealtimeSegmentManagerTest {
       _tableConfig =
           new 
TableConfig.Builder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setNumReplicas(_numReplicas)
               .setLLC(true).setStreamConfigs(streamConfigs).build();
-      _streamConfig = new PartitionLevelStreamConfig(_tableConfig);
+      _streamConfig = new 
PartitionLevelStreamConfig(_tableConfig.getTableName(),
+          _tableConfig.getIndexingConfig().getStreamConfigs());
     }
 
     void makeConsumingInstancePartitions() {
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdaterTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdaterTest.java
index a742bf8..184557d 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdaterTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdaterTest.java
@@ -22,8 +22,8 @@ import java.util.Arrays;
 import org.apache.pinot.common.config.TableNameBuilder;
 import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata;
 import org.apache.pinot.common.utils.LLCSegmentName;
-import org.apache.pinot.core.realtime.stream.PartitionLevelStreamConfig;
-import org.apache.pinot.core.realtime.stream.StreamConfig;
+import org.apache.pinot.spi.stream.PartitionLevelStreamConfig;
+import org.apache.pinot.spi.stream.StreamConfig;
 import org.testng.annotations.Test;
 
 import static org.mockito.Mockito.mock;
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
index f80b4fc..421fa3f 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
@@ -46,10 +46,10 @@ import 
org.apache.pinot.core.indexsegment.mutable.MutableSegment;
 import org.apache.pinot.core.indexsegment.mutable.MutableSegmentImpl;
 import org.apache.pinot.core.realtime.converter.RealtimeSegmentConverter;
 import org.apache.pinot.core.realtime.impl.RealtimeSegmentConfig;
-import org.apache.pinot.core.realtime.stream.StreamConfig;
-import org.apache.pinot.core.realtime.stream.StreamConsumerFactory;
-import org.apache.pinot.core.realtime.stream.StreamConsumerFactoryProvider;
-import org.apache.pinot.core.realtime.stream.StreamLevelConsumer;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamConsumerFactory;
+import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
+import org.apache.pinot.spi.stream.StreamLevelConsumer;
 import org.apache.pinot.core.segment.index.loader.IndexLoadingConfig;
 import org.joda.time.DateTime;
 import org.joda.time.DateTimeZone;
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index 8efea23..a569d45 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -41,7 +41,7 @@ import org.apache.pinot.common.config.SegmentPartitionConfig;
 import org.apache.pinot.common.config.TableConfig;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.common.data.StarTreeIndexSpec;
-import org.apache.pinot.core.realtime.stream.RowMetadata;
+import org.apache.pinot.spi.stream.RowMetadata;
 import org.apache.pinot.common.metadata.instance.InstanceZKMetadata;
 import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata;
 import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
@@ -62,16 +62,16 @@ import 
org.apache.pinot.core.indexsegment.mutable.MutableSegmentImpl;
 import org.apache.pinot.core.io.readerwriter.PinotDataBufferMemoryManager;
 import org.apache.pinot.core.realtime.converter.RealtimeSegmentConverter;
 import org.apache.pinot.core.realtime.impl.RealtimeSegmentConfig;
-import org.apache.pinot.core.realtime.stream.MessageBatch;
-import org.apache.pinot.core.realtime.stream.PartitionLevelConsumer;
-import org.apache.pinot.core.realtime.stream.PartitionLevelStreamConfig;
-import org.apache.pinot.core.realtime.stream.PermanentConsumerException;
-import org.apache.pinot.core.realtime.stream.StreamConsumerFactory;
-import org.apache.pinot.core.realtime.stream.StreamConsumerFactoryProvider;
-import org.apache.pinot.core.realtime.stream.StreamDecoderProvider;
-import org.apache.pinot.core.realtime.stream.StreamMessageDecoder;
-import org.apache.pinot.core.realtime.stream.StreamMetadataProvider;
-import org.apache.pinot.core.realtime.stream.TransientConsumerException;
+import org.apache.pinot.spi.stream.MessageBatch;
+import org.apache.pinot.spi.stream.PartitionLevelConsumer;
+import org.apache.pinot.spi.stream.PartitionLevelStreamConfig;
+import org.apache.pinot.spi.stream.PermanentConsumerException;
+import org.apache.pinot.spi.stream.StreamConsumerFactory;
+import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
+import org.apache.pinot.spi.stream.StreamDecoderProvider;
+import org.apache.pinot.spi.stream.StreamMessageDecoder;
+import org.apache.pinot.spi.stream.StreamMetadataProvider;
+import org.apache.pinot.spi.stream.TransientConsumerException;
 import org.apache.pinot.core.segment.creator.impl.V1Constants;
 import org.apache.pinot.core.segment.index.loader.IndexLoadingConfig;
 import org.apache.pinot.server.realtime.ServerSegmentCompletionProtocolHandler;
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/recordtransformer/TimeTransformer.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/recordtransformer/TimeTransformer.java
index 2c1aaa5..8c56a95 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/recordtransformer/TimeTransformer.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/recordtransformer/TimeTransformer.java
@@ -23,7 +23,7 @@ import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.TimeFieldSpec;
 import org.apache.pinot.spi.data.TimeGranularitySpec;
 import org.apache.pinot.common.utils.time.TimeConverter;
-import org.apache.pinot.common.utils.time.TimeUtils;
+import org.apache.pinot.spi.utils.TimeUtils;
 import org.apache.pinot.spi.data.readers.GenericRow;
 
 
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegment.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegment.java
index 027baf1..8fd15c1 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegment.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegment.java
@@ -19,7 +19,7 @@
 package org.apache.pinot.core.indexsegment.mutable;
 
 import javax.annotation.Nullable;
-import org.apache.pinot.core.realtime.stream.RowMetadata;
+import org.apache.pinot.spi.stream.RowMetadata;
 import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.core.indexsegment.IndexSegment;
 
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
index 9259462..21930ae 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
@@ -35,7 +35,7 @@ import org.apache.pinot.spi.data.DimensionFieldSpec;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.MetricFieldSpec;
 import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.core.realtime.stream.RowMetadata;
+import org.apache.pinot.spi.stream.RowMetadata;
 import org.apache.pinot.common.segment.SegmentMetadata;
 import org.apache.pinot.common.utils.NetUtil;
 import org.apache.pinot.spi.data.readers.GenericRow;
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/SimpleAvroMessageDecoder.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/SimpleAvroMessageDecoder.java
index 0780bb0..e450238 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/SimpleAvroMessageDecoder.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/SimpleAvroMessageDecoder.java
@@ -29,6 +29,7 @@ import org.apache.avro.io.DatumReader;
 import org.apache.avro.io.DecoderFactory;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.stream.StreamMessageDecoder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentColumnarIndexCreator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentColumnarIndexCreator.java
index e9f75ca..3126262 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentColumnarIndexCreator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentColumnarIndexCreator.java
@@ -37,7 +37,7 @@ import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.common.data.StarTreeIndexSpec;
 import org.apache.pinot.spi.utils.BytesUtils;
 import org.apache.pinot.common.utils.FileUtils;
-import org.apache.pinot.common.utils.time.TimeUtils;
+import org.apache.pinot.spi.utils.TimeUtils;
 import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.core.data.partition.PartitionFunction;
 import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/SegmentMetadataImpl.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/SegmentMetadataImpl.java
index 87bfd5d..7a3dad3 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/SegmentMetadataImpl.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/SegmentMetadataImpl.java
@@ -49,7 +49,7 @@ import 
org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
 import org.apache.pinot.common.segment.SegmentMetadata;
 import org.apache.pinot.common.segment.StarTreeMetadata;
 import org.apache.pinot.spi.utils.JsonUtils;
-import org.apache.pinot.common.utils.time.TimeUtils;
+import org.apache.pinot.spi.utils.TimeUtils;
 import org.apache.pinot.core.indexsegment.generator.SegmentVersion;
 import org.apache.pinot.core.segment.creator.impl.V1Constants;
 import org.apache.pinot.core.segment.creator.impl.V1Constants.MetadataKeys;
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/util/ReplicationUtils.java 
b/pinot-core/src/main/java/org/apache/pinot/core/util/ReplicationUtils.java
index 026c9ac..96a0d28 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/util/ReplicationUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/util/ReplicationUtils.java
@@ -20,9 +20,8 @@ package org.apache.pinot.core.util;
 
 import org.apache.pinot.common.config.SegmentsValidationAndRetentionConfig;
 import org.apache.pinot.common.config.TableConfig;
-import org.apache.pinot.common.config.TableNameBuilder;
 import org.apache.pinot.common.utils.CommonConstants.Helix.TableType;
-import org.apache.pinot.core.realtime.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamConfig;
 
 
 /**
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
index 4f97369..b1dd92a 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
@@ -39,8 +39,8 @@ import 
org.apache.pinot.core.indexsegment.mutable.MutableSegmentImpl;
 import org.apache.pinot.core.realtime.impl.RealtimeSegmentStatsHistory;
 import 
org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConsumerFactory;
 import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamMessageDecoder;
-import org.apache.pinot.core.realtime.stream.PermanentConsumerException;
-import org.apache.pinot.core.realtime.stream.StreamConfigProperties;
+import org.apache.pinot.spi.stream.PermanentConsumerException;
+import org.apache.pinot.spi.stream.StreamConfigProperties;
 import org.apache.pinot.core.segment.index.loader.IndexLoadingConfig;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/data/readers/PinotSegmentUtil.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/data/readers/PinotSegmentUtil.java
index 404ed49..1c8d399 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/data/readers/PinotSegmentUtil.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/data/readers/PinotSegmentUtil.java
@@ -32,7 +32,7 @@ import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.TimeFieldSpec;
-import org.apache.pinot.common.utils.time.TimeUtils;
+import org.apache.pinot.spi.utils.TimeUtils;
 import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
 import 
org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplAggregateMetricsTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplAggregateMetricsTest.java
index 3aa6b11..e08e6b3 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplAggregateMetricsTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplAggregateMetricsTest.java
@@ -31,7 +31,7 @@ import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.MetricFieldSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
-import org.apache.pinot.core.realtime.stream.StreamMessageMetadata;
+import org.apache.pinot.spi.stream.StreamMessageMetadata;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplTest.java
index f1b6b38..7a80371 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplTest.java
@@ -23,7 +23,6 @@ import java.net.URL;
 import java.util.Collections;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.avro.data.readers.AvroRecordReader;
-import org.apache.pinot.avro.data.readers.AvroUtils;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.common.segment.ReadMode;
@@ -37,7 +36,7 @@ import org.apache.pinot.spi.data.readers.RecordReader;
 import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
 import org.apache.pinot.core.indexsegment.immutable.ImmutableSegment;
 import org.apache.pinot.core.indexsegment.immutable.ImmutableSegmentLoader;
-import org.apache.pinot.core.realtime.stream.StreamMessageMetadata;
+import org.apache.pinot.spi.stream.StreamMessageMetadata;
 import org.apache.pinot.core.segment.creator.SegmentIndexCreationDriver;
 import 
org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
 import org.apache.pinot.core.segment.index.readers.Dictionary;
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakePartitionLevelConsumer.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakePartitionLevelConsumer.java
index 8699644..adad83a 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakePartitionLevelConsumer.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakePartitionLevelConsumer.java
@@ -32,9 +32,9 @@ import org.apache.avro.io.BinaryEncoder;
 import org.apache.avro.io.EncoderFactory;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.avro.data.readers.AvroUtils;
-import org.apache.pinot.core.realtime.stream.MessageBatch;
-import org.apache.pinot.core.realtime.stream.PartitionLevelConsumer;
-import org.apache.pinot.core.realtime.stream.StreamConfig;
+import org.apache.pinot.spi.stream.MessageBatch;
+import org.apache.pinot.spi.stream.PartitionLevelConsumer;
+import org.apache.pinot.spi.stream.StreamConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConfigUtils.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConfigUtils.java
index ea3e030..c9860ab 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConfigUtils.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConfigUtils.java
@@ -27,8 +27,8 @@ import java.util.Map;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.common.utils.TarGzCompressionUtils;
-import org.apache.pinot.core.realtime.stream.StreamConfig;
-import org.apache.pinot.core.realtime.stream.StreamConfigProperties;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamConfigProperties;
 import org.apache.pinot.util.TestUtils;
 import org.testng.Assert;
 
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java
index 678b3a6..8a21772 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java
@@ -19,19 +19,17 @@
 package org.apache.pinot.core.realtime.impl.fakestream;
 
 import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.common.metadata.instance.InstanceZKMetadata;
-import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.spi.data.readers.GenericRow;
-import org.apache.pinot.core.realtime.stream.MessageBatch;
-import org.apache.pinot.core.realtime.stream.OffsetCriteria;
-import org.apache.pinot.core.realtime.stream.PartitionLevelConsumer;
-import org.apache.pinot.core.realtime.stream.StreamConfig;
-import org.apache.pinot.core.realtime.stream.StreamConsumerFactory;
-import org.apache.pinot.core.realtime.stream.StreamConsumerFactoryProvider;
-import org.apache.pinot.core.realtime.stream.StreamDecoderProvider;
-import org.apache.pinot.core.realtime.stream.StreamLevelConsumer;
-import org.apache.pinot.core.realtime.stream.StreamMessageDecoder;
-import org.apache.pinot.core.realtime.stream.StreamMetadataProvider;
+import org.apache.pinot.spi.stream.MessageBatch;
+import org.apache.pinot.spi.stream.OffsetCriteria;
+import org.apache.pinot.spi.stream.PartitionLevelConsumer;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamConsumerFactory;
+import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
+import org.apache.pinot.spi.stream.StreamDecoderProvider;
+import org.apache.pinot.spi.stream.StreamLevelConsumer;
+import org.apache.pinot.spi.stream.StreamMessageDecoder;
+import org.apache.pinot.spi.stream.StreamMetadataProvider;
 
 
 /**
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamLevelConsumer.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamLevelConsumer.java
index 1e4f07a..ab675e0 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamLevelConsumer.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamLevelConsumer.java
@@ -19,7 +19,7 @@
 package org.apache.pinot.core.realtime.impl.fakestream;
 
 import org.apache.pinot.spi.data.readers.GenericRow;
-import org.apache.pinot.core.realtime.stream.StreamLevelConsumer;
+import org.apache.pinot.spi.stream.StreamLevelConsumer;
 
 
 /**
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMessageBatch.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMessageBatch.java
index 4afd6da..4796b34 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMessageBatch.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMessageBatch.java
@@ -19,7 +19,7 @@
 package org.apache.pinot.core.realtime.impl.fakestream;
 
 import java.util.List;
-import org.apache.pinot.core.realtime.stream.MessageBatch;
+import org.apache.pinot.spi.stream.MessageBatch;
 
 
 /**
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMessageDecoder.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMessageDecoder.java
index c4a3f59..dceaa22 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMessageDecoder.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMessageDecoder.java
@@ -28,7 +28,7 @@ import org.apache.avro.io.DecoderFactory;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.core.realtime.stream.AvroRecordToPinotRowGenerator;
-import org.apache.pinot.core.realtime.stream.StreamMessageDecoder;
+import org.apache.pinot.spi.stream.StreamMessageDecoder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMetadataProvider.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMetadataProvider.java
index a75660d..f355a00 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMetadataProvider.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMetadataProvider.java
@@ -19,13 +19,11 @@
 package org.apache.pinot.core.realtime.impl.fakestream;
 
 import java.io.IOException;
-import java.util.Map;
 import java.util.concurrent.TimeoutException;
 import javax.annotation.Nonnull;
-import org.apache.pinot.core.realtime.stream.OffsetCriteria;
-import org.apache.pinot.core.realtime.stream.StreamConfig;
-import org.apache.pinot.core.realtime.stream.StreamConfigProperties;
-import org.apache.pinot.core.realtime.stream.StreamMetadataProvider;
+import org.apache.pinot.spi.stream.OffsetCriteria;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamMetadataProvider;
 
 
 /**
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/realtime/stream/MetadataEqualsHashCodeTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/realtime/stream/MetadataEqualsHashCodeTest.java
index 0d5b0fa..4690eb3 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/realtime/stream/MetadataEqualsHashCodeTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/realtime/stream/MetadataEqualsHashCodeTest.java
@@ -20,6 +20,7 @@ package org.apache.pinot.core.realtime.stream;
 
 import nl.jqno.equalsverifier.EqualsVerifier;
 import nl.jqno.equalsverifier.Warning;
+import org.apache.pinot.spi.stream.StreamConfig;
 import org.testng.annotations.Test;
 
 
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/realtime/stream/OffsetCriteriaTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/realtime/stream/OffsetCriteriaTest.java
index 461788c..b99d49f 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/realtime/stream/OffsetCriteriaTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/realtime/stream/OffsetCriteriaTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.core.realtime.stream;
 
+import org.apache.pinot.spi.stream.OffsetCriteria;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/realtime/stream/StreamConfigTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/realtime/stream/StreamConfigTest.java
index 85c22d3..4afad60 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/realtime/stream/StreamConfigTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/realtime/stream/StreamConfigTest.java
@@ -20,8 +20,12 @@ package org.apache.pinot.core.realtime.stream;
 
 import java.util.HashMap;
 import java.util.Map;
-import org.apache.pinot.common.utils.DataSize;
-import org.apache.pinot.common.utils.time.TimeUtils;
+import org.apache.pinot.spi.stream.OffsetCriteria;
+import org.apache.pinot.spi.stream.PartitionLevelStreamConfig;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamConfigProperties;
+import org.apache.pinot.spi.utils.DataSize;
+import org.apache.pinot.spi.utils.TimeUtils;
 import 
org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConsumerFactory;
 import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamMessageDecoder;
 import org.testng.Assert;
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/segment/index/creator/SegmentGenerationWithTimeColumnTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/segment/index/creator/SegmentGenerationWithTimeColumnTest.java
index e0dead7..c2d88da 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/segment/index/creator/SegmentGenerationWithTimeColumnTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/segment/index/creator/SegmentGenerationWithTimeColumnTest.java
@@ -30,7 +30,7 @@ import org.apache.pinot.spi.data.DimensionFieldSpec;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.TimeFieldSpec;
-import org.apache.pinot.common.utils.time.TimeUtils;
+import org.apache.pinot.spi.utils.TimeUtils;
 import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.core.data.readers.GenericRowRecordReader;
 import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/TransformQueriesTest.java 
b/pinot-core/src/test/java/org/apache/pinot/queries/TransformQueriesTest.java
index 876c57e..5967b79 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/queries/TransformQueriesTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/queries/TransformQueriesTest.java
@@ -33,7 +33,7 @@ import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.common.response.broker.AggregationResult;
 import org.apache.pinot.common.response.broker.BrokerResponseNative;
 import org.apache.pinot.common.segment.ReadMode;
-import org.apache.pinot.common.utils.time.TimeUtils;
+import org.apache.pinot.spi.utils.TimeUtils;
 import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.core.data.manager.SegmentDataManager;
 import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
diff --git 
a/pinot-ingestion-jobs/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java
 
b/pinot-ingestion-jobs/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java
index cc26713..7ad3d74 100644
--- 
a/pinot-ingestion-jobs/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java
+++ 
b/pinot-ingestion-jobs/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java
@@ -39,7 +39,7 @@ import org.apache.pinot.common.config.TableConfig;
 import org.apache.pinot.csv.data.readers.CSVRecordReaderConfig;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.TimeFieldSpec;
-import org.apache.pinot.common.utils.DataSize;
+import org.apache.pinot.spi.utils.DataSize;
 import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.common.utils.TarGzCompressionUtils;
 import org.apache.pinot.core.data.readers.FileFormat;
diff --git 
a/pinot-ingestion-jobs/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/SparkSegmentCreationFunction.java
 
b/pinot-ingestion-jobs/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/SparkSegmentCreationFunction.java
index 34b5df3..30984e3 100644
--- 
a/pinot-ingestion-jobs/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/SparkSegmentCreationFunction.java
+++ 
b/pinot-ingestion-jobs/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/SparkSegmentCreationFunction.java
@@ -33,7 +33,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.pinot.common.config.SegmentsValidationAndRetentionConfig;
 import org.apache.pinot.common.config.TableConfig;
-import org.apache.pinot.common.utils.DataSize;
+import org.apache.pinot.spi.utils.DataSize;
 import org.apache.pinot.common.utils.TarGzCompressionUtils;
 import org.apache.pinot.core.data.readers.FileFormat;
 import org.apache.pinot.thrift.data.readers.ThriftRecordReaderConfig;
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
index 997d980..6c1ca4d 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
@@ -40,7 +40,7 @@ import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.common.utils.TarGzCompressionUtils;
 import org.apache.pinot.common.utils.ZkStarter;
 import org.apache.pinot.core.realtime.impl.kafka.KafkaStarterUtils;
-import org.apache.pinot.core.realtime.stream.StreamDataServerStartable;
+import org.apache.pinot.spi.stream.StreamDataServerStartable;
 import org.apache.pinot.util.TestUtils;
 import org.testng.Assert;
 
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java
index ce21f4f..af07f75 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java
@@ -63,8 +63,8 @@ import org.apache.pinot.common.utils.StringUtil;
 import org.apache.pinot.common.utils.TarGzCompressionUtils;
 import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
 import org.apache.pinot.core.realtime.impl.kafka.KafkaStarterUtils;
-import org.apache.pinot.core.realtime.stream.StreamDataProducer;
-import org.apache.pinot.core.realtime.stream.StreamDataProvider;
+import org.apache.pinot.spi.stream.StreamDataProducer;
+import org.apache.pinot.spi.stream.StreamDataProvider;
 import org.apache.pinot.core.segment.creator.SegmentIndexCreationDriver;
 import 
org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
 import org.apache.pinot.core.startree.v2.builder.StarTreeV2BuilderConfig;
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
index 00b3570..d281ed3 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
@@ -65,9 +65,9 @@ import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.core.indexsegment.generator.SegmentVersion;
 import org.apache.pinot.core.realtime.impl.kafka.KafkaStreamConfigProperties;
 import org.apache.pinot.core.realtime.stream.AvroRecordToPinotRowGenerator;
-import org.apache.pinot.core.realtime.stream.StreamConfig;
-import org.apache.pinot.core.realtime.stream.StreamConfigProperties;
-import org.apache.pinot.core.realtime.stream.StreamMessageDecoder;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamConfigProperties;
+import org.apache.pinot.spi.stream.StreamMessageDecoder;
 import org.apache.pinot.minion.MinionStarter;
 import org.apache.pinot.minion.events.MinionEventObserverFactory;
 import org.apache.pinot.minion.executor.PinotTaskExecutorFactory;
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/FlakyConsumerRealtimeClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/FlakyConsumerRealtimeClusterIntegrationTest.java
index 35c7e10..d1100bc 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/FlakyConsumerRealtimeClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/FlakyConsumerRealtimeClusterIntegrationTest.java
@@ -21,14 +21,12 @@ package org.apache.pinot.integration.tests;
 import java.lang.reflect.Constructor;
 import java.util.Random;
 import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.common.metadata.instance.InstanceZKMetadata;
-import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.spi.data.readers.GenericRow;
-import org.apache.pinot.core.realtime.stream.PartitionLevelConsumer;
-import org.apache.pinot.core.realtime.stream.StreamConfig;
-import org.apache.pinot.core.realtime.stream.StreamConsumerFactory;
-import org.apache.pinot.core.realtime.stream.StreamLevelConsumer;
-import org.apache.pinot.core.realtime.stream.StreamMetadataProvider;
+import org.apache.pinot.spi.stream.PartitionLevelConsumer;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamConsumerFactory;
+import org.apache.pinot.spi.stream.StreamLevelConsumer;
+import org.apache.pinot.spi.stream.StreamMetadataProvider;
 import org.apache.pinot.core.realtime.impl.kafka.KafkaStarterUtils;
 import org.testng.annotations.BeforeClass;
 
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTestCommandLineRunner.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTestCommandLineRunner.java
index 157b525..2f48436 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTestCommandLineRunner.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTestCommandLineRunner.java
@@ -39,7 +39,7 @@ import 
org.apache.pinot.broker.requesthandler.PinotQueryRequest;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.controller.ControllerConf;
-import org.apache.pinot.core.realtime.stream.StreamDataServerStartable;
+import org.apache.pinot.spi.stream.StreamDataServerStartable;
 import org.apache.pinot.tools.query.comparison.QueryComparison;
 import org.apache.pinot.core.realtime.impl.kafka.KafkaStarterUtils;
 import org.apache.pinot.util.TestUtils;
diff --git 
a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRealtimeConsumptionSpeed.java
 
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRealtimeConsumptionSpeed.java
index 2d5e599..69b095a 100644
--- 
a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRealtimeConsumptionSpeed.java
+++ 
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRealtimeConsumptionSpeed.java
@@ -27,7 +27,7 @@ import java.util.Random;
 import java.util.concurrent.TimeUnit;
 import org.apache.pinot.common.utils.TarGzCompressionUtils;
 import org.apache.pinot.core.realtime.impl.kafka.KafkaStarterUtils;
-import org.apache.pinot.core.realtime.stream.StreamDataServerStartable;
+import org.apache.pinot.spi.stream.StreamDataServerStartable;
 import org.apache.pinot.integration.tests.ClusterIntegrationTestUtils;
 import org.apache.pinot.integration.tests.RealtimeClusterIntegrationTest;
 import org.apache.pinot.util.TestUtils;
diff --git 
a/pinot-perf/src/main/java/org/apache/pinot/perf/RealtimeStressTest.java 
b/pinot-perf/src/main/java/org/apache/pinot/perf/RealtimeStressTest.java
index 8c3f573..f9a8dc5 100644
--- a/pinot-perf/src/main/java/org/apache/pinot/perf/RealtimeStressTest.java
+++ b/pinot-perf/src/main/java/org/apache/pinot/perf/RealtimeStressTest.java
@@ -27,7 +27,7 @@ import java.util.Random;
 import java.util.concurrent.TimeUnit;
 import org.apache.pinot.common.utils.TarGzCompressionUtils;
 import org.apache.pinot.core.realtime.impl.kafka.KafkaStarterUtils;
-import org.apache.pinot.core.realtime.stream.StreamDataServerStartable;
+import org.apache.pinot.spi.stream.StreamDataServerStartable;
 import org.apache.pinot.integration.tests.ClusterIntegrationTestUtils;
 import org.apache.pinot.integration.tests.OfflineClusterIntegrationTest;
 import org.apache.pinot.integration.tests.RealtimeClusterIntegrationTest;
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/MessageBatch.java
 b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java
similarity index 97%
rename from 
pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/MessageBatch.java
rename to pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java
index cb20a13..9ac7626 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/MessageBatch.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.core.realtime.stream;
+package org.apache.pinot.spi.stream;
 
 import org.apache.pinot.spi.annotations.InterfaceAudience;
 import org.apache.pinot.spi.annotations.InterfaceStability;
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/OffsetCriteria.java
 b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/OffsetCriteria.java
similarity index 98%
rename from 
pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/OffsetCriteria.java
rename to 
pinot-spi/src/main/java/org/apache/pinot/spi/stream/OffsetCriteria.java
index e3f0ad1..3093949 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/OffsetCriteria.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/OffsetCriteria.java
@@ -16,11 +16,11 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.core.realtime.stream;
+package org.apache.pinot.spi.stream;
 
 import com.google.common.base.Preconditions;
 import org.apache.pinot.spi.utils.EqualityUtils;
-import org.apache.pinot.common.utils.time.TimeUtils;
+import org.apache.pinot.spi.utils.TimeUtils;
 
 
 /**
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/PartitionCountFetcher.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionCountFetcher.java
similarity index 98%
rename from 
pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/PartitionCountFetcher.java
rename to 
pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionCountFetcher.java
index 174c975..d523235 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/PartitionCountFetcher.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionCountFetcher.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.core.realtime.stream;
+package org.apache.pinot.spi.stream;
 
 import java.util.concurrent.Callable;
 import org.slf4j.Logger;
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/PartitionLevelConsumer.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionLevelConsumer.java
similarity index 97%
rename from 
pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/PartitionLevelConsumer.java
rename to 
pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionLevelConsumer.java
index 6b30055..3efaef6 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/PartitionLevelConsumer.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionLevelConsumer.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.core.realtime.stream;
+package org.apache.pinot.spi.stream;
 
 import java.io.Closeable;
 import org.apache.pinot.spi.annotations.InterfaceAudience;
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/PartitionLevelStreamConfig.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionLevelStreamConfig.java
similarity index 93%
rename from 
pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/PartitionLevelStreamConfig.java
rename to 
pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionLevelStreamConfig.java
index 46b2c3d..0f6a383 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/PartitionLevelStreamConfig.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionLevelStreamConfig.java
@@ -16,12 +16,11 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.core.realtime.stream;
+package org.apache.pinot.spi.stream;
 
 import com.google.common.base.Preconditions;
 import java.util.Map;
-import org.apache.pinot.common.config.TableConfig;
-import org.apache.pinot.common.utils.time.TimeUtils;
+import org.apache.pinot.spi.utils.TimeUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -33,10 +32,6 @@ import org.slf4j.LoggerFactory;
 public class PartitionLevelStreamConfig extends StreamConfig {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PartitionLevelStreamConfig.class);
 
-  public PartitionLevelStreamConfig(TableConfig tableConfig) {
-    super(tableConfig);
-  }
-
   public PartitionLevelStreamConfig(String tableNameWithType, Map<String, 
String> streamConfigMap) {
     super(tableNameWithType, streamConfigMap);
   }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/PartitionOffsetFetcher.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionOffsetFetcher.java
similarity index 98%
rename from 
pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/PartitionOffsetFetcher.java
rename to 
pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionOffsetFetcher.java
index dc4927f..0072bc8 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/PartitionOffsetFetcher.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionOffsetFetcher.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.core.realtime.stream;
+package org.apache.pinot.spi.stream;
 
 import java.util.concurrent.Callable;
 import org.slf4j.Logger;
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/PermanentConsumerException.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PermanentConsumerException.java
similarity index 96%
rename from 
pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/PermanentConsumerException.java
rename to 
pinot-spi/src/main/java/org/apache/pinot/spi/stream/PermanentConsumerException.java
index 7a61bdb..4e2f83a 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/PermanentConsumerException.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PermanentConsumerException.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.core.realtime.stream;
+package org.apache.pinot.spi.stream;
 
 /**
  * A stream subsystem error that indicates a situation that is not likely to 
clear up by retrying the request (for
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/RowMetadata.java
 b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/RowMetadata.java
similarity index 97%
rename from 
pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/RowMetadata.java
rename to pinot-spi/src/main/java/org/apache/pinot/spi/stream/RowMetadata.java
index bb4997b..a8b0f21 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/RowMetadata.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/RowMetadata.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.core.realtime.stream;
+package org.apache.pinot.spi.stream;
 
 import org.apache.pinot.spi.annotations.InterfaceAudience;
 import org.apache.pinot.spi.annotations.InterfaceStability;
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamConfig.java
 b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java
similarity index 97%
rename from 
pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamConfig.java
rename to pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java
index 59b65f72..13f0c77 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.core.realtime.stream;
+package org.apache.pinot.spi.stream;
 
 import com.google.common.base.Preconditions;
 import java.util.ArrayList;
@@ -24,10 +24,9 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
-import org.apache.pinot.common.config.TableConfig;
-import org.apache.pinot.common.utils.DataSize;
+import org.apache.pinot.spi.utils.DataSize;
 import org.apache.pinot.spi.utils.EqualityUtils;
-import org.apache.pinot.common.utils.time.TimeUtils;
+import org.apache.pinot.spi.utils.TimeUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -79,10 +78,6 @@ public class StreamConfig {
 
   private final Map<String, String> _streamConfigMap = new HashMap<>();
 
-  public StreamConfig(TableConfig tableConfig) {
-    this(tableConfig.getTableName(), 
tableConfig.getIndexingConfig().getStreamConfigs());
-  }
-
   /**
    * Initializes a StreamConfig using the map of stream configs from the table 
config
    */
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamConfigProperties.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfigProperties.java
similarity index 99%
rename from 
pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamConfigProperties.java
rename to 
pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfigProperties.java
index 5b4cbf1..fc46c9b 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamConfigProperties.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfigProperties.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.core.realtime.stream;
+package org.apache.pinot.spi.stream;
 
 import com.google.common.base.Joiner;
 
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamConsumerFactory.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java
similarity index 94%
rename from 
pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamConsumerFactory.java
rename to 
pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java
index 1f6889b..60b1da2 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamConsumerFactory.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java
@@ -16,11 +16,9 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.core.realtime.stream;
+package org.apache.pinot.spi.stream;
 
 import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.common.metadata.instance.InstanceZKMetadata;
-import org.apache.pinot.common.metrics.ServerMetrics;
 
 
 /**
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamConsumerFactoryProvider.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactoryProvider.java
similarity index 91%
rename from 
pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamConsumerFactoryProvider.java
rename to 
pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactoryProvider.java
index e2bfb6f..824c439 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamConsumerFactoryProvider.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactoryProvider.java
@@ -16,9 +16,9 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.core.realtime.stream;
+package org.apache.pinot.spi.stream;
 
-import org.apache.pinot.common.Utils;
+import org.apache.commons.lang3.exception.ExceptionUtils;
 
 
 /**
@@ -36,7 +36,7 @@ public abstract class StreamConsumerFactoryProvider {
     try {
       factory = (StreamConsumerFactory) 
Class.forName(streamConfig.getConsumerFactoryClassName()).newInstance();
     } catch (Exception e) {
-      Utils.rethrowException(e);
+      ExceptionUtils.rethrow(e);
     }
     factory.init(streamConfig);
     return factory;
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamDataProducer.java
 b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataProducer.java
similarity index 96%
rename from 
pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamDataProducer.java
rename to 
pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataProducer.java
index 53275a0..ed19f86 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamDataProducer.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataProducer.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.core.realtime.stream;
+package org.apache.pinot.spi.stream;
 
 import java.util.Properties;
 
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamDataProvider.java
 b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataProvider.java
similarity index 97%
rename from 
pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamDataProvider.java
rename to 
pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataProvider.java
index 05e0827..3889da4 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamDataProvider.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataProvider.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.core.realtime.stream;
+package org.apache.pinot.spi.stream;
 
 import java.util.Properties;
 
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamDataServerStartable.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataServerStartable.java
similarity index 96%
rename from 
pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamDataServerStartable.java
rename to 
pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataServerStartable.java
index 34a1a38..afa2a0b 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamDataServerStartable.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataServerStartable.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.core.realtime.stream;
+package org.apache.pinot.spi.stream;
 
 import java.util.Properties;
 
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamDecoderProvider.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDecoderProvider.java
similarity index 92%
rename from 
pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamDecoderProvider.java
rename to 
pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDecoderProvider.java
index 72be342..d6b8114 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamDecoderProvider.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDecoderProvider.java
@@ -16,10 +16,10 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.core.realtime.stream;
+package org.apache.pinot.spi.stream;
 
 import java.util.Map;
-import org.apache.pinot.common.Utils;
+import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.pinot.spi.data.Schema;
 
 
@@ -42,7 +42,7 @@ public abstract class StreamDecoderProvider {
       decoder = (StreamMessageDecoder) 
Class.forName(decoderClass).newInstance();
       decoder.init(decoderProperties, schema, streamConfig.getTopicName());
     } catch (Exception e) {
-      Utils.rethrowException(e);
+      ExceptionUtils.rethrow(e);
     }
     return decoder;
   }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamLevelConsumer.java
 b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamLevelConsumer.java
similarity index 97%
rename from 
pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamLevelConsumer.java
rename to 
pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamLevelConsumer.java
index eb1f8c8..5501fe9 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamLevelConsumer.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamLevelConsumer.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.core.realtime.stream;
+package org.apache.pinot.spi.stream;
 
 import org.apache.pinot.spi.annotations.InterfaceAudience;
 import org.apache.pinot.spi.annotations.InterfaceStability;
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamMessageDecoder.java
 b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessageDecoder.java
similarity index 97%
rename from 
pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamMessageDecoder.java
rename to 
pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessageDecoder.java
index 82a1356..4898916 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamMessageDecoder.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessageDecoder.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.core.realtime.stream;
+package org.apache.pinot.spi.stream;
 
 import java.util.Map;
 import org.apache.pinot.spi.annotations.InterfaceAudience;
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamMessageMetadata.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessageMetadata.java
similarity index 96%
rename from 
pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamMessageMetadata.java
rename to 
pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessageMetadata.java
index 31983f0..f32216a 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamMessageMetadata.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessageMetadata.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.core.realtime.stream;
+package org.apache.pinot.spi.stream;
 
 /**
  * A class that provides metadata associated with the message of a stream, for 
e.g.,
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamMetadataProvider.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
similarity index 97%
rename from 
pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamMetadataProvider.java
rename to 
pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
index b9469c4..b57deb3 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamMetadataProvider.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.core.realtime.stream;
+package org.apache.pinot.spi.stream;
 
 import java.io.Closeable;
 import javax.annotation.Nonnull;
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/TransientConsumerException.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/TransientConsumerException.java
similarity index 95%
rename from 
pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/TransientConsumerException.java
rename to 
pinot-spi/src/main/java/org/apache/pinot/spi/stream/TransientConsumerException.java
index 568b543..affb792 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/TransientConsumerException.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/TransientConsumerException.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.core.realtime.stream;
+package org.apache.pinot.spi.stream;
 
 /**
  * A stream subsystem error that indicates a situation that is likely to be 
transient (for example, network error or
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSize.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/DataSize.java
similarity index 98%
rename from 
pinot-common/src/main/java/org/apache/pinot/common/utils/DataSize.java
rename to pinot-spi/src/main/java/org/apache/pinot/spi/utils/DataSize.java
index 9e80324..d74baf2 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSize.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/DataSize.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.common.utils;
+package org.apache.pinot.spi.utils;
 
 import java.math.BigDecimal;
 import java.text.DecimalFormat;
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/time/TimeUtils.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/TimeUtils.java
similarity index 99%
rename from 
pinot-common/src/main/java/org/apache/pinot/common/utils/time/TimeUtils.java
rename to pinot-spi/src/main/java/org/apache/pinot/spi/utils/TimeUtils.java
index 86136d1..39c3a58 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/time/TimeUtils.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/TimeUtils.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.common.utils.time;
+package org.apache.pinot.spi.utils;
 
 import java.util.concurrent.TimeUnit;
 import javax.annotation.Nullable;
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/HybridQuickstart.java 
b/pinot-tools/src/main/java/org/apache/pinot/tools/HybridQuickstart.java
index f74c8b6..646dcae 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/HybridQuickstart.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/HybridQuickstart.java
@@ -28,8 +28,8 @@ import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.common.utils.ZkStarter;
 import org.apache.pinot.core.data.readers.FileFormat;
 import org.apache.pinot.core.realtime.impl.kafka.KafkaStarterUtils;
-import org.apache.pinot.core.realtime.stream.StreamDataProvider;
-import org.apache.pinot.core.realtime.stream.StreamDataServerStartable;
+import org.apache.pinot.spi.stream.StreamDataProvider;
+import org.apache.pinot.spi.stream.StreamDataServerStartable;
 import org.apache.pinot.tools.Quickstart.Color;
 import org.apache.pinot.tools.admin.command.QuickstartRunner;
 import org.apache.pinot.tools.streams.AirlineDataStream;
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeQuickStart.java 
b/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeQuickStart.java
index 7b0696a..a59f528 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeQuickStart.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeQuickStart.java
@@ -25,8 +25,8 @@ import java.net.URL;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.common.utils.ZkStarter;
 import org.apache.pinot.core.realtime.impl.kafka.KafkaStarterUtils;
-import org.apache.pinot.core.realtime.stream.StreamDataProvider;
-import org.apache.pinot.core.realtime.stream.StreamDataServerStartable;
+import org.apache.pinot.spi.stream.StreamDataProvider;
+import org.apache.pinot.spi.stream.StreamDataServerStartable;
 import org.apache.pinot.tools.Quickstart.Color;
 import org.apache.pinot.tools.admin.command.QuickstartRunner;
 import org.apache.pinot.tools.streams.MeetupRsvpStream;
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/OfflineSegmentIntervalCheckerCommand.java
 
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/OfflineSegmentIntervalCheckerCommand.java
index 7e83724..e971be0 100644
--- 
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/OfflineSegmentIntervalCheckerCommand.java
+++ 
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/OfflineSegmentIntervalCheckerCommand.java
@@ -32,7 +32,7 @@ import org.apache.pinot.common.config.TableConfig;
 import org.apache.pinot.common.config.TableNameBuilder;
 import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
-import org.apache.pinot.common.utils.time.TimeUtils;
+import org.apache.pinot.spi.utils.TimeUtils;
 import org.apache.pinot.controller.util.SegmentIntervalUtils;
 import org.apache.pinot.tools.Command;
 import org.joda.time.Interval;
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/RealtimeProvisioningHelperCommand.java
 
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/RealtimeProvisioningHelperCommand.java
index 930c27c..06fffe4 100644
--- 
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/RealtimeProvisioningHelperCommand.java
+++ 
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/RealtimeProvisioningHelperCommand.java
@@ -26,8 +26,8 @@ import java.util.concurrent.TimeUnit;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pinot.common.config.TableConfig;
-import org.apache.pinot.common.utils.DataSize;
-import org.apache.pinot.common.utils.time.TimeUtils;
+import org.apache.pinot.spi.utils.DataSize;
+import org.apache.pinot.spi.utils.TimeUtils;
 import org.apache.pinot.tools.Command;
 import org.apache.pinot.tools.realtime.provisioning.MemoryEstimator;
 import org.kohsuke.args4j.Option;
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartKafkaCommand.java
 
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartKafkaCommand.java
index 36d58e5..1455b9b 100644
--- 
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartKafkaCommand.java
+++ 
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartKafkaCommand.java
@@ -21,8 +21,8 @@ package org.apache.pinot.tools.admin.command;
 import java.io.File;
 import java.io.IOException;
 import org.apache.pinot.core.realtime.impl.kafka.KafkaStarterUtils;
-import org.apache.pinot.core.realtime.stream.StreamDataProvider;
-import org.apache.pinot.core.realtime.stream.StreamDataServerStartable;
+import org.apache.pinot.spi.stream.StreamDataProvider;
+import org.apache.pinot.spi.stream.StreamDataServerStartable;
 import org.apache.pinot.tools.Command;
 import org.kohsuke.args4j.Option;
 import org.slf4j.Logger;
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StreamAvroIntoKafkaCommand.java
 
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StreamAvroIntoKafkaCommand.java
index ac11660..04dcb5d 100644
--- 
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StreamAvroIntoKafkaCommand.java
+++ 
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StreamAvroIntoKafkaCommand.java
@@ -34,8 +34,8 @@ import org.apache.avro.io.EncoderFactory;
 import org.apache.pinot.avro.data.readers.AvroUtils;
 import org.apache.pinot.common.utils.HashUtil;
 import org.apache.pinot.core.realtime.impl.kafka.KafkaStarterUtils;
-import org.apache.pinot.core.realtime.stream.StreamDataProducer;
-import org.apache.pinot.core.realtime.stream.StreamDataProvider;
+import org.apache.pinot.spi.stream.StreamDataProducer;
+import org.apache.pinot.spi.stream.StreamDataProvider;
 import org.apache.pinot.tools.Command;
 import org.kohsuke.args4j.Option;
 import org.slf4j.Logger;
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/realtime/provisioning/MemoryEstimator.java
 
b/pinot-tools/src/main/java/org/apache/pinot/tools/realtime/provisioning/MemoryEstimator.java
index 91e95b1..865591b 100644
--- 
a/pinot-tools/src/main/java/org/apache/pinot/tools/realtime/provisioning/MemoryEstimator.java
+++ 
b/pinot-tools/src/main/java/org/apache/pinot/tools/realtime/provisioning/MemoryEstimator.java
@@ -28,7 +28,7 @@ import org.apache.commons.io.FileUtils;
 import org.apache.pinot.common.config.TableConfig;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
-import org.apache.pinot.common.utils.DataSize;
+import org.apache.pinot.spi.utils.DataSize;
 import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.core.data.readers.PinotSegmentRecordReader;
 import org.apache.pinot.core.indexsegment.mutable.MutableSegmentImpl;
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/AirlineDataStream.java
 
b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/AirlineDataStream.java
index 7fca814..9baf1f1 100644
--- 
a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/AirlineDataStream.java
+++ 
b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/AirlineDataStream.java
@@ -35,8 +35,8 @@ import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.TimeFieldSpec;
 import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.core.realtime.impl.kafka.KafkaStarterUtils;
-import org.apache.pinot.core.realtime.stream.StreamDataProducer;
-import org.apache.pinot.core.realtime.stream.StreamDataProvider;
+import org.apache.pinot.spi.stream.StreamDataProducer;
+import org.apache.pinot.spi.stream.StreamDataProvider;
 import org.apache.pinot.tools.Quickstart;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpStream.java
 
b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpStream.java
index b44b33d..664e548 100644
--- 
a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpStream.java
+++ 
b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpStream.java
@@ -33,9 +33,9 @@ import javax.websocket.Session;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.core.realtime.impl.kafka.KafkaStarterUtils;
-import org.apache.pinot.core.realtime.stream.StreamDataProducer;
-import org.apache.pinot.core.realtime.stream.StreamDataProvider;
-import org.apache.pinot.core.realtime.stream.StreamMessageDecoder;
+import org.apache.pinot.spi.stream.StreamDataProducer;
+import org.apache.pinot.spi.stream.StreamDataProvider;
+import org.apache.pinot.spi.stream.StreamMessageDecoder;
 import org.glassfish.tyrus.client.ClientManager;
 
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to