http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
 
b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
index 4caa401..35a3513 100644
--- 
a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
+++ 
b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
@@ -29,6 +29,7 @@ import 
org.apache.spark.sql.catalyst.expressions.GenericInternalRow
 import org.apache.spark.sql.SparkSession
 
 import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.datamap.Segment
 import org.apache.carbondata.core.datastore.block.SegmentProperties
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage}
@@ -197,8 +198,8 @@ class StreamHandoffRDD[K, V](
   override protected def getPartitions: Array[Partition] = {
     val job = Job.getInstance(FileFactory.getConfiguration)
     val inputFormat = new CarbonTableInputFormat[Array[Object]]()
-    val segmentList = new util.ArrayList[String](1)
-    segmentList.add(handOffSegmentId)
+    val segmentList = new util.ArrayList[Segment](1)
+    segmentList.add(Segment.toSegment(handOffSegmentId))
     val splits = inputFormat.getSplitsOfStreaming(
       job,
       
carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.getAbsoluteTableIdentifier,

Reply via email to