I have a native R model and doing structured streaming on it. Data comes from Kafka and goes into dapply method where my model does prediction and data is written to sink.
Problem:- My model requires caret package. Inside dapply function for every stream job, caret package is loaded again which adds (~2s) delay. kafka <- read.stream("kafka",subscribe = "source", kafka.bootstrap.servers = "10.117.172.48:9092", topic = "source") lines<- select(kafka, cast(kafka$value, "string")) schema<-schema(lines) library(caret) df4<-dapply(lines,function(x){ print(system.time(library(caret))) x },schema) q2 <- write.stream(df4,"kafka", checkpointLocation = loc, topic = "sink", kafka.bootstrap.servers = "10.117.172.48:9092") awaitTermination(q2) For the above code, for every new stream my output is, 18/03/23 11:08:10 INFO BufferedStreamThread: Loading required package: lattice 18/03/23 11:08:10 INFO BufferedStreamThread: 18/03/23 11:08:10 INFO BufferedStreamThread: Attaching package: ‘lattice’ 18/03/23 11:08:10 INFO BufferedStreamThread: 18/03/23 11:08:10 INFO BufferedStreamThread: The following object is masked from ‘package:SparkR’: 18/03/23 11:08:10 INFO BufferedStreamThread: 18/03/23 11:08:10 INFO BufferedStreamThread: histogram 18/03/23 11:08:10 INFO BufferedStreamThread: 18/03/23 11:08:10 INFO BufferedStreamThread: Loading required package: ggplot2 18/03/23 11:08:12 INFO BufferedStreamThread: user system elapsed 18/03/23 11:08:12 INFO BufferedStreamThread: 1.937 0.062 1.999 18/03/23 11:08:12 INFO RRunner: Times: boot = 0.009 s, init = 0.017 s, broadcast = 0.001 s, read-input = 0.001 s, compute = 2.064 s, write-output = 0.001 s, total = 2.093 s PFA: rest log file. Ideally, the packages shouldn't be loaded again. I think the environment is getting created and destroyed with each query. Is there some solution to this? or Am I missing something here? Thanks, Deepansh
Java ref type org.apache.spark.sql.SparkSession id 1 Re-using existing Spark Context. Call sparkR.session.stop() or restart R to create a new Spark Context Warning message: 'sparkR.init' is deprecated. Use 'sparkR.session' instead. See help("Deprecated") Loading required package: lattice Attaching package: ‘lattice’ The following object is masked from ‘package:SparkR’: histogram Loading required package: ggplot2 18/03/23 11:07:58 INFO StreamExecution: Starting [id = 1605966f-51e3-4df9-b284-7535e37b6d44, runId = ab605bec-3d57-4a61-a45b-5174292ab2ec]. Use /tmp/temporary-%20Gunp5pUYnV to store the query checkpoint. 18/03/23 11:07:58 INFO ConsumerConfig: ConsumerConfig values: auto.commit.interval.ms = 5000 auto.offset.reset = earliest bootstrap.servers = [10.117.172.48:9092] check.crcs = true client.id = connections.max.idle.ms = 540000 enable.auto.commit = false exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = spark-kafka-source-a46c9dc7-36fc-40d3-a5c3-28d566108ab3--149645343-driver-0 heartbeat.interval.ms = 3000 interceptor.classes = null key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 300000 max.poll.records = 1 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 65536 reconnect.backoff.ms = 50 request.timeout.ms = 305000 retry.backoff.ms = 100 sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT send.buffer.bytes = 131072 session.timeout.ms = 10000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer 18/03/23 11:07:58 INFO ConsumerConfig: ConsumerConfig values: auto.commit.interval.ms = 5000 auto.offset.reset = earliest bootstrap.servers = [10.117.172.48:9092] check.crcs = true client.id = consumer-1 connections.max.idle.ms = 540000 enable.auto.commit = false exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = spark-kafka-source-a46c9dc7-36fc-40d3-a5c3-28d566108ab3--149645343-driver-0 heartbeat.interval.ms = 3000 interceptor.classes = null key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 300000 max.poll.records = 1 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 65536 reconnect.backoff.ms = 50 request.timeout.ms = 305000 retry.backoff.ms = 100 sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT send.buffer.bytes = 131072 session.timeout.ms = 10000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer 18/03/23 11:07:59 INFO AppInfoParser: Kafka version : 0.10.2.0 18/03/23 11:07:59 INFO AppInfoParser: Kafka commitId : 576d93a8dc0cf421 18/03/23 11:07:59 INFO SessionState: Created local directory: /tmp/fa519f67-cbad-4691-a260-744dd9c9a432_resources 18/03/23 11:07:59 INFO SessionState: Created HDFS directory: /tmp/hive/d0g00m3/fa519f67-cbad-4691-a260-744dd9c9a432 18/03/23 11:07:59 INFO SessionState: Created local directory: /tmp/d0g00m3/fa519f67-cbad-4691-a260-744dd9c9a432 18/03/23 11:07:59 INFO SessionState: Created HDFS directory: /tmp/hive/d0g00m3/fa519f67-cbad-4691-a260-744dd9c9a432/_tmp_space.db 18/03/23 11:07:59 INFO HiveClientImpl: Warehouse location for Hive client (version 1.2.1) is file:/u/users/d0g00m3/spark-warehouse/ 18/03/23 11:07:59 INFO StreamExecution: Starting new streaming query. 18/03/23 11:07:59 INFO AbstractCoordinator: Discovered coordinator 10.117.172.48:9092 (id: 2147483647 rack: null) for group spark-kafka-source-a46c9dc7-36fc-40d3-a5c3-28d566108ab3--149645343-driver-0. 18/03/23 11:07:59 INFO ConsumerCoordinator: Revoking previously assigned partitions [] for group spark-kafka-source-a46c9dc7-36fc-40d3-a5c3-28d566108ab3--149645343-driver-0 18/03/23 11:07:59 INFO AbstractCoordinator: (Re-)joining group spark-kafka-source-a46c9dc7-36fc-40d3-a5c3-28d566108ab3--149645343-driver-0 18/03/23 11:07:59 INFO AbstractCoordinator: Successfully joined group spark-kafka-source-a46c9dc7-36fc-40d3-a5c3-28d566108ab3--149645343-driver-0 with generation 1 18/03/23 11:07:59 INFO ConsumerCoordinator: Setting newly assigned partitions [source-0] for group spark-kafka-source-a46c9dc7-36fc-40d3-a5c3-28d566108ab3--149645343-driver-0 18/03/23 11:07:59 INFO KafkaSource: Initial offsets: {"source":{"0":4647070}} 18/03/23 11:07:59 INFO StreamExecution: Committed offsets for batch 0. Metadata OffsetSeqMetadata(0,1521803279883,Map(spark.sql.shuffle.partitions -> 200)) 18/03/23 11:07:59 INFO KafkaSource: GetBatch called with start = None, end = {"source":{"0":4647070}} 18/03/23 11:07:59 INFO KafkaSource: Partitions added: Map() 18/03/23 11:08:00 INFO KafkaSource: GetBatch generating RDD of offset range: KafkaSourceRDDOffsetRange(source-0,4647070,4647070,None) 18/03/23 11:08:00 INFO CodeGenerator: Code generated in 222.605008 ms 18/03/23 11:08:00 INFO CodeGenerator: Code generated in 12.444683 ms 18/03/23 11:08:00 INFO SparkContext: Starting job: start at NativeMethodAccessorImpl.java:0 18/03/23 11:08:00 INFO DAGScheduler: Got job 0 (start at NativeMethodAccessorImpl.java:0) with 1 output partitions 18/03/23 11:08:00 INFO DAGScheduler: Final stage: ResultStage 0 (start at NativeMethodAccessorImpl.java:0) 18/03/23 11:08:00 INFO DAGScheduler: Parents of final stage: List() 18/03/23 11:08:00 INFO DAGScheduler: Missing parents: List() 18/03/23 11:08:00 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[6] at start at NativeMethodAccessorImpl.java:0), which has no missing parents 18/03/23 11:08:00 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 16.9 KB, free 366.3 MB) 18/03/23 11:08:00 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 7.8 KB, free 366.3 MB) 18/03/23 11:08:00 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 10.246.79.20:46249 (size: 7.8 KB, free: 366.3 MB) 18/03/23 11:08:00 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1006 18/03/23 11:08:00 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (MapPartitionsRDD[6] at start at NativeMethodAccessorImpl.java:0) (first 15 tasks are for partitions Vector(0)) 18/03/23 11:08:00 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks 18/03/23 11:08:00 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, executor driver, partition 0, PROCESS_LOCAL, 5001 bytes) 18/03/23 11:08:00 INFO Executor: Running task 0.0 in stage 0.0 (TID 0) 18/03/23 11:08:00 INFO Executor: Fetching file:/u/users/d0g00m3/testlib.R with timestamp 1521803266773 18/03/23 11:08:00 INFO Utils: /u/users/d0g00m3/testlib.R has been previously copied to /tmp/spark-e2d1b965-a3dc-4847-82ce-b09b8f7e2b76/userFiles-f7e87ef1-3a54-4f3f-8ee2-fa10278d0227/testlib.R 18/03/23 11:08:00 INFO Executor: Fetching spark://10.246.79.20:57945/jars/org.apache.kafka_kafka-clients-0.10.2.0.jar with timestamp 1521803266547 18/03/23 11:08:00 INFO TransportClientFactory: Successfully created connection to /10.246.79.20:57945 after 10 ms (0 ms spent in bootstraps) 18/03/23 11:08:00 INFO Utils: Fetching spark://10.246.79.20:57945/jars/org.apache.kafka_kafka-clients-0.10.2.0.jar to /tmp/spark-e2d1b965-a3dc-4847-82ce-b09b8f7e2b76/userFiles-f7e87ef1-3a54-4f3f-8ee2-fa10278d0227/fetchFileTemp2480019594959116903.tmp 18/03/23 11:08:00 INFO Executor: Adding file:/tmp/spark-e2d1b965-a3dc-4847-82ce-b09b8f7e2b76/userFiles-f7e87ef1-3a54-4f3f-8ee2-fa10278d0227/org.apache.kafka_kafka-clients-0.10.2.0.jar to class loader 18/03/23 11:08:00 INFO Executor: Fetching spark://10.246.79.20:57945/jars/org.apache.spark_spark-sql-kafka-0-10_2.11-2.2.0.jar with timestamp 1521803266546 18/03/23 11:08:00 INFO Utils: Fetching spark://10.246.79.20:57945/jars/org.apache.spark_spark-sql-kafka-0-10_2.11-2.2.0.jar to /tmp/spark-e2d1b965-a3dc-4847-82ce-b09b8f7e2b76/userFiles-f7e87ef1-3a54-4f3f-8ee2-fa10278d0227/fetchFileTemp8966057625455886086.tmp 18/03/23 11:08:00 INFO Executor: Adding file:/tmp/spark-e2d1b965-a3dc-4847-82ce-b09b8f7e2b76/userFiles-f7e87ef1-3a54-4f3f-8ee2-fa10278d0227/org.apache.spark_spark-sql-kafka-0-10_2.11-2.2.0.jar to class loader 18/03/23 11:08:01 INFO ConsumerConfig: ConsumerConfig values: auto.commit.interval.ms = 5000 auto.offset.reset = none bootstrap.servers = [10.117.172.48:9092] check.crcs = true client.id = connections.max.idle.ms = 540000 enable.auto.commit = false exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = spark-kafka-source-a46c9dc7-36fc-40d3-a5c3-28d566108ab3--149645343-executor heartbeat.interval.ms = 3000 interceptor.classes = null key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 300000 max.poll.records = 500 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 65536 reconnect.backoff.ms = 50 request.timeout.ms = 305000 retry.backoff.ms = 100 sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT send.buffer.bytes = 131072 session.timeout.ms = 10000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer 18/03/23 11:08:01 INFO ConsumerConfig: ConsumerConfig values: auto.commit.interval.ms = 5000 auto.offset.reset = none bootstrap.servers = [10.117.172.48:9092] check.crcs = true client.id = consumer-2 connections.max.idle.ms = 540000 enable.auto.commit = false exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = spark-kafka-source-a46c9dc7-36fc-40d3-a5c3-28d566108ab3--149645343-executor heartbeat.interval.ms = 3000 interceptor.classes = null key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 300000 max.poll.records = 500 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 65536 reconnect.backoff.ms = 50 request.timeout.ms = 305000 retry.backoff.ms = 100 sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT send.buffer.bytes = 131072 session.timeout.ms = 10000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer 18/03/23 11:08:01 INFO AppInfoParser: Kafka version : 0.10.2.0 18/03/23 11:08:01 INFO AppInfoParser: Kafka commitId : 576d93a8dc0cf421 18/03/23 11:08:01 INFO KafkaSourceRDD: Beginning offset 4647070 is the same as ending offset skipping source 0 18/03/23 11:08:01 INFO CodeGenerator: Code generated in 24.246571 ms 18/03/23 11:08:01 INFO CodeGenerator: Code generated in 14.032234 ms 18/03/23 11:08:01 INFO RRunner: Times: boot = 0.718 s, init = 0.009 s, broadcast = 0.000 s, read-input = 0.000 s, compute = 0.000 s, write-output = 0.000 s, total = 0.727 s 18/03/23 11:08:01 INFO CodeGenerator: Code generated in 13.304316 ms 18/03/23 11:08:01 INFO ProducerConfig: ProducerConfig values: acks = 1 batch.size = 16384 block.on.buffer.full = false bootstrap.servers = [10.117.172.48:9092] buffer.memory = 33554432 client.id = compression.type = none connections.max.idle.ms = 540000 interceptor.classes = null key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer linger.ms = 0 max.block.ms = 60000 max.in.flight.requests.per.connection = 5 max.request.size = 1048576 metadata.fetch.timeout.ms = 60000 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.sample.window.ms = 30000 partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner receive.buffer.bytes = 32768 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retries = 0 retry.backoff.ms = 100 sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT send.buffer.bytes = 131072 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS timeout.ms = 30000 value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer 18/03/23 11:08:01 INFO AppInfoParser: Kafka version : 0.10.2.0 18/03/23 11:08:01 INFO AppInfoParser: Kafka commitId : 576d93a8dc0cf421 18/03/23 11:08:01 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 1154 bytes result sent to driver 18/03/23 11:08:01 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 1096 ms on localhost (executor driver) (1/1) 18/03/23 11:08:01 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 18/03/23 11:08:01 INFO DAGScheduler: ResultStage 0 (start at NativeMethodAccessorImpl.java:0) finished in 1.128 s 18/03/23 11:08:01 INFO DAGScheduler: Job 0 finished: start at NativeMethodAccessorImpl.java:0, took 1.335817 s 18/03/23 11:08:01 INFO StreamExecution: Streaming query made progress: { "id" : "1605966f-51e3-4df9-b284-7535e37b6d44", "runId" : "ab605bec-3d57-4a61-a45b-5174292ab2ec", "name" : null, "timestamp" : "2018-03-23T11:07:59.157Z", "numInputRows" : 0, "processedRowsPerSecond" : 0.0, "durationMs" : { "addBatch" : 1754, "getBatch" : 115, "getOffset" : 715, "queryPlanning" : 135, "triggerExecution" : 2782, "walCommit" : 43 }, "stateOperators" : [ ], "sources" : [ { "description" : "KafkaSource[Subscribe[source]]", "startOffset" : null, "endOffset" : { "source" : { "0" : 4647070 } }, "numInputRows" : 0, "processedRowsPerSecond" : 0.0 } ], "sink" : { "description" : "KafkaSink" } } 18/03/23 11:08:02 INFO StreamExecution: Streaming query made progress: { "id" : "1605966f-51e3-4df9-b284-7535e37b6d44", "runId" : "ab605bec-3d57-4a61-a45b-5174292ab2ec", "name" : null, "timestamp" : "2018-03-23T11:08:02.197Z", "numInputRows" : 0, "inputRowsPerSecond" : 0.0, "processedRowsPerSecond" : 0.0, "durationMs" : { "getOffset" : 15, "triggerExecution" : 16 }, "stateOperators" : [ ], "sources" : [ { "description" : "KafkaSource[Subscribe[source]]", "startOffset" : { "source" : { "0" : 4647070 } }, "endOffset" : { "source" : { "0" : 4647070 } }, "numInputRows" : 0, "inputRowsPerSecond" : 0.0, "processedRowsPerSecond" : 0.0 } ], "sink" : { "description" : "KafkaSink" } } 18/03/23 11:08:04 INFO StreamExecution: Committed offsets for batch 1. Metadata OffsetSeqMetadata(0,1521803284620,Map(spark.sql.shuffle.partitions -> 200)) 18/03/23 11:08:04 INFO KafkaSource: GetBatch called with start = Some({"source":{"0":4647070}}), end = {"source":{"0":4647071}} 18/03/23 11:08:04 INFO KafkaSource: Partitions added: Map() 18/03/23 11:08:04 INFO KafkaSource: GetBatch generating RDD of offset range: KafkaSourceRDDOffsetRange(source-0,4647070,4647071,None) 18/03/23 11:08:04 INFO SparkContext: Starting job: start at NativeMethodAccessorImpl.java:0 18/03/23 11:08:04 INFO DAGScheduler: Got job 1 (start at NativeMethodAccessorImpl.java:0) with 1 output partitions 18/03/23 11:08:04 INFO DAGScheduler: Final stage: ResultStage 1 (start at NativeMethodAccessorImpl.java:0) 18/03/23 11:08:04 INFO DAGScheduler: Parents of final stage: List() 18/03/23 11:08:04 INFO DAGScheduler: Missing parents: List() 18/03/23 11:08:04 INFO DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[13] at start at NativeMethodAccessorImpl.java:0), which has no missing parents 18/03/23 11:08:04 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 16.9 KB, free 366.3 MB) 18/03/23 11:08:04 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 7.8 KB, free 366.3 MB) 18/03/23 11:08:04 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 10.246.79.20:46249 (size: 7.8 KB, free: 366.3 MB) 18/03/23 11:08:04 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1006 18/03/23 11:08:04 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[13] at start at NativeMethodAccessorImpl.java:0) (first 15 tasks are for partitions Vector(0)) 18/03/23 11:08:04 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks 18/03/23 11:08:04 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1, localhost, executor driver, partition 0, PROCESS_LOCAL, 5001 bytes) 18/03/23 11:08:04 INFO Executor: Running task 0.0 in stage 1.0 (TID 1) 18/03/23 11:08:04 WARN CachedKafkaConsumer: CachedKafkaConsumer is not running in UninterruptibleThread. It may hang when CachedKafkaConsumer's methods are interrupted because of KAFKA-1894 18/03/23 11:08:04 INFO BufferedStreamThread: Loading required package: lattice 18/03/23 11:08:04 INFO BufferedStreamThread: 18/03/23 11:08:04 INFO BufferedStreamThread: Attaching package: ‘lattice’ 18/03/23 11:08:04 INFO BufferedStreamThread: 18/03/23 11:08:04 INFO BufferedStreamThread: The following object is masked from ‘package:SparkR’: 18/03/23 11:08:04 INFO BufferedStreamThread: 18/03/23 11:08:04 INFO BufferedStreamThread: histogram 18/03/23 11:08:04 INFO BufferedStreamThread: 18/03/23 11:08:04 INFO BufferedStreamThread: Loading required package: ggplot2 18/03/23 11:08:07 INFO BufferedStreamThread: user system elapsed 18/03/23 11:08:07 INFO BufferedStreamThread: 2.038 0.074 2.114 18/03/23 11:08:07 INFO RRunner: Times: boot = 0.011 s, init = 0.088 s, broadcast = 0.000 s, read-input = 0.001 s, compute = 2.176 s, write-output = 0.001 s, total = 2.277 s 18/03/23 11:08:07 INFO Executor: Finished task 0.0 in stage 1.0 (TID 1). 1154 bytes result sent to driver 18/03/23 11:08:07 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 2380 ms on localhost (executor driver) (1/1) 18/03/23 11:08:07 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 18/03/23 11:08:07 INFO DAGScheduler: ResultStage 1 (start at NativeMethodAccessorImpl.java:0) finished in 2.381 s 18/03/23 11:08:07 INFO DAGScheduler: Job 1 finished: start at NativeMethodAccessorImpl.java:0, took 2.395044 s 18/03/23 11:08:07 INFO StreamExecution: Streaming query made progress: { "id" : "1605966f-51e3-4df9-b284-7535e37b6d44", "runId" : "ab605bec-3d57-4a61-a45b-5174292ab2ec", "name" : null, "timestamp" : "2018-03-23T11:08:04.604Z", "numInputRows" : 1, "inputRowsPerSecond" : 37.03703703703704, "processedRowsPerSecond" : 0.3947887879984209, "durationMs" : { "addBatch" : 2422, "getBatch" : 9, "getOffset" : 15, "queryPlanning" : 8, "triggerExecution" : 2533, "walCommit" : 75 }, "stateOperators" : [ ], "sources" : [ { "description" : "KafkaSource[Subscribe[source]]", "startOffset" : { "source" : { "0" : 4647070 } }, "endOffset" : { "source" : { "0" : 4647071 } }, "numInputRows" : 1, "inputRowsPerSecond" : 37.03703703703704, "processedRowsPerSecond" : 0.3947887879984209 } ], "sink" : { "description" : "KafkaSink" } } 18/03/23 11:08:07 INFO StreamExecution: Streaming query made progress: { "id" : "1605966f-51e3-4df9-b284-7535e37b6d44", "runId" : "ab605bec-3d57-4a61-a45b-5174292ab2ec", "name" : null, "timestamp" : "2018-03-23T11:08:07.183Z", "numInputRows" : 0, "inputRowsPerSecond" : 0.0, "processedRowsPerSecond" : 0.0, "durationMs" : { "getOffset" : 17, "triggerExecution" : 17 }, "stateOperators" : [ ], "sources" : [ { "description" : "KafkaSource[Subscribe[source]]", "startOffset" : { "source" : { "0" : 4647071 } }, "endOffset" : { "source" : { "0" : 4647071 } }, "numInputRows" : 0, "inputRowsPerSecond" : 0.0, "processedRowsPerSecond" : 0.0 } ], "sink" : { "description" : "KafkaSink" } } 18/03/23 11:08:10 INFO StreamExecution: Committed offsets for batch 2. Metadata OffsetSeqMetadata(0,1521803290685,Map(spark.sql.shuffle.partitions -> 200)) 18/03/23 11:08:10 INFO KafkaSource: GetBatch called with start = Some({"source":{"0":4647071}}), end = {"source":{"0":4647072}} 18/03/23 11:08:10 INFO KafkaSource: Partitions added: Map() 18/03/23 11:08:10 INFO KafkaSource: GetBatch generating RDD of offset range: KafkaSourceRDDOffsetRange(source-0,4647071,4647072,None) 18/03/23 11:08:10 INFO SparkContext: Starting job: start at NativeMethodAccessorImpl.java:0 18/03/23 11:08:10 INFO DAGScheduler: Got job 2 (start at NativeMethodAccessorImpl.java:0) with 1 output partitions 18/03/23 11:08:10 INFO DAGScheduler: Final stage: ResultStage 2 (start at NativeMethodAccessorImpl.java:0) 18/03/23 11:08:10 INFO DAGScheduler: Parents of final stage: List() 18/03/23 11:08:10 INFO DAGScheduler: Missing parents: List() 18/03/23 11:08:10 INFO DAGScheduler: Submitting ResultStage 2 (MapPartitionsRDD[20] at start at NativeMethodAccessorImpl.java:0), which has no missing parents 18/03/23 11:08:10 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 16.9 KB, free 366.2 MB) 18/03/23 11:08:10 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 7.8 KB, free 366.2 MB) 18/03/23 11:08:10 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 10.246.79.20:46249 (size: 7.8 KB, free: 366.3 MB) 18/03/23 11:08:10 INFO SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:1006 18/03/23 11:08:10 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 2 (MapPartitionsRDD[20] at start at NativeMethodAccessorImpl.java:0) (first 15 tasks are for partitions Vector(0)) 18/03/23 11:08:10 INFO TaskSchedulerImpl: Adding task set 2.0 with 1 tasks 18/03/23 11:08:10 INFO TaskSetManager: Starting task 0.0 in stage 2.0 (TID 2, localhost, executor driver, partition 0, PROCESS_LOCAL, 5001 bytes) 18/03/23 11:08:10 INFO Executor: Running task 0.0 in stage 2.0 (TID 2) 18/03/23 11:08:10 WARN CachedKafkaConsumer: CachedKafkaConsumer is not running in UninterruptibleThread. It may hang when CachedKafkaConsumer's methods are interrupted because of KAFKA-1894 18/03/23 11:08:10 INFO BufferedStreamThread: Loading required package: lattice 18/03/23 11:08:10 INFO BufferedStreamThread: 18/03/23 11:08:10 INFO BufferedStreamThread: Attaching package: ‘lattice’ 18/03/23 11:08:10 INFO BufferedStreamThread: 18/03/23 11:08:10 INFO BufferedStreamThread: The following object is masked from ‘package:SparkR’: 18/03/23 11:08:10 INFO BufferedStreamThread: 18/03/23 11:08:10 INFO BufferedStreamThread: histogram 18/03/23 11:08:10 INFO BufferedStreamThread: 18/03/23 11:08:10 INFO BufferedStreamThread: Loading required package: ggplot2 18/03/23 11:08:12 INFO BufferedStreamThread: user system elapsed 18/03/23 11:08:12 INFO BufferedStreamThread: 1.937 0.062 1.999 18/03/23 11:08:12 INFO RRunner: Times: boot = 0.009 s, init = 0.017 s, broadcast = 0.001 s, read-input = 0.001 s, compute = 2.064 s, write-output = 0.001 s, total = 2.093 s 18/03/23 11:08:12 INFO Executor: Finished task 0.0 in stage 2.0 (TID 2). 1154 bytes result sent to driver 18/03/23 11:08:12 INFO TaskSetManager: Finished task 0.0 in stage 2.0 (TID 2) in 2122 ms on localhost (executor driver) (1/1) 18/03/23 11:08:12 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool 18/03/23 11:08:12 INFO DAGScheduler: ResultStage 2 (start at NativeMethodAccessorImpl.java:0) finished in 2.122 s 18/03/23 11:08:12 INFO DAGScheduler: Job 2 finished: start at NativeMethodAccessorImpl.java:0, took 2.137442 s 18/03/23 11:08:12 INFO StreamExecution: Streaming query made progress: { "id" : "1605966f-51e3-4df9-b284-7535e37b6d44", "runId" : "ab605bec-3d57-4a61-a45b-5174292ab2ec", "name" : null, "timestamp" : "2018-03-23T11:08:10.660Z", "numInputRows" : 1, "inputRowsPerSecond" : 27.027027027027028, "processedRowsPerSecond" : 0.4452359750667854, "durationMs" : { "addBatch" : 2170, "getBatch" : 9, "getOffset" : 24, "queryPlanning" : 9, "triggerExecution" : 2246, "walCommit" : 29 }, "stateOperators" : [ ], "sources" : [ { "description" : "KafkaSource[Subscribe[source]]", "startOffset" : { "source" : { "0" : 4647071 } }, "endOffset" : { "source" : { "0" : 4647072 } }, "numInputRows" : 1, "inputRowsPerSecond" : 27.027027027027028, "processedRowsPerSecond" : 0.4452359750667854 } ], "sink" : { "description" : "KafkaSink" } } 18/03/23 11:08:12 INFO StreamExecution: Streaming query made progress: { "id" : "1605966f-51e3-4df9-b284-7535e37b6d44", "runId" : "ab605bec-3d57-4a61-a45b-5174292ab2ec", "name" : null, "timestamp" : "2018-03-23T11:08:12.942Z", "numInputRows" : 0, "inputRowsPerSecond" : 0.0, "processedRowsPerSecond" : 0.0, "durationMs" : { "getOffset" : 12, "triggerExecution" : 12 }, "stateOperators" : [ ], "sources" : [ { "description" : "KafkaSource[Subscribe[source]]", "startOffset" : { "source" : { "0" : 4647072 } }, "endOffset" : { "source" : { "0" : 4647072 } }, "numInputRows" : 0, "inputRowsPerSecond" : 0.0, "processedRowsPerSecond" : 0.0 } ], "sink" : { "description" : "KafkaSink" } } 18/03/23 11:08:14 INFO StreamExecution: Committed offsets for batch 3. Metadata OffsetSeqMetadata(0,1521803294496,Map(spark.sql.shuffle.partitions -> 200)) 18/03/23 11:08:14 INFO KafkaSource: GetBatch called with start = Some({"source":{"0":4647072}}), end = {"source":{"0":4647073}} 18/03/23 11:08:14 INFO KafkaSource: Partitions added: Map() 18/03/23 11:08:14 INFO BlockManagerInfo: Removed broadcast_2_piece0 on 10.246.79.20:46249 in memory (size: 7.8 KB, free: 366.3 MB) 18/03/23 11:08:14 INFO KafkaSource: GetBatch generating RDD of offset range: KafkaSourceRDDOffsetRange(source-0,4647072,4647073,None) 18/03/23 11:08:14 INFO BlockManagerInfo: Removed broadcast_0_piece0 on 10.246.79.20:46249 in memory (size: 7.8 KB, free: 366.3 MB) 18/03/23 11:08:14 INFO BlockManagerInfo: Removed broadcast_1_piece0 on 10.246.79.20:46249 in memory (size: 7.8 KB, free: 366.3 MB) 18/03/23 11:08:14 INFO SparkContext: Starting job: start at NativeMethodAccessorImpl.java:0 18/03/23 11:08:14 INFO DAGScheduler: Got job 3 (start at NativeMethodAccessorImpl.java:0) with 1 output partitions 18/03/23 11:08:14 INFO DAGScheduler: Final stage: ResultStage 3 (start at NativeMethodAccessorImpl.java:0) 18/03/23 11:08:14 INFO DAGScheduler: Parents of final stage: List() 18/03/23 11:08:14 INFO DAGScheduler: Missing parents: List() 18/03/23 11:08:14 INFO DAGScheduler: Submitting ResultStage 3 (MapPartitionsRDD[27] at start at NativeMethodAccessorImpl.java:0), which has no missing parents 18/03/23 11:08:14 INFO MemoryStore: Block broadcast_3 stored as values in memory (estimated size 16.9 KB, free 366.3 MB) 18/03/23 11:08:14 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 7.8 KB, free 366.3 MB) 18/03/23 11:08:14 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on 10.246.79.20:46249 (size: 7.8 KB, free: 366.3 MB) 18/03/23 11:08:14 INFO SparkContext: Created broadcast 3 from broadcast at DAGScheduler.scala:1006 18/03/23 11:08:14 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 3 (MapPartitionsRDD[27] at start at NativeMethodAccessorImpl.java:0) (first 15 tasks are for partitions Vector(0)) 18/03/23 11:08:14 INFO TaskSchedulerImpl: Adding task set 3.0 with 1 tasks 18/03/23 11:08:14 INFO TaskSetManager: Starting task 0.0 in stage 3.0 (TID 3, localhost, executor driver, partition 0, PROCESS_LOCAL, 5001 bytes) 18/03/23 11:08:14 INFO Executor: Running task 0.0 in stage 3.0 (TID 3) 18/03/23 11:08:14 WARN CachedKafkaConsumer: CachedKafkaConsumer is not running in UninterruptibleThread. It may hang when CachedKafkaConsumer's methods are interrupted because of KAFKA-1894 18/03/23 11:08:14 INFO BufferedStreamThread: Loading required package: lattice 18/03/23 11:08:14 INFO BufferedStreamThread: 18/03/23 11:08:14 INFO BufferedStreamThread: Attaching package: ‘lattice’ 18/03/23 11:08:14 INFO BufferedStreamThread: 18/03/23 11:08:14 INFO BufferedStreamThread: The following object is masked from ‘package:SparkR’: 18/03/23 11:08:14 INFO BufferedStreamThread: 18/03/23 11:08:14 INFO BufferedStreamThread: histogram 18/03/23 11:08:14 INFO BufferedStreamThread: 18/03/23 11:08:14 INFO BufferedStreamThread: Loading required package: ggplot2 18/03/23 11:08:16 INFO BufferedStreamThread: user system elapsed 18/03/23 11:08:16 INFO BufferedStreamThread: 1.916 0.068 1.987 18/03/23 11:08:16 INFO RRunner: Times: boot = 0.009 s, init = 0.019 s, broadcast = 0.001 s, read-input = 0.001 s, compute = 2.054 s, write-output = 0.001 s, total = 2.085 s 18/03/23 11:08:16 INFO Executor: Finished task 0.0 in stage 3.0 (TID 3). 1154 bytes result sent to driver 18/03/23 11:08:16 INFO TaskSetManager: Finished task 0.0 in stage 3.0 (TID 3) in 2109 ms on localhost (executor driver) (1/1) 18/03/23 11:08:16 INFO TaskSchedulerImpl: Removed TaskSet 3.0, whose tasks have all completed, from pool 18/03/23 11:08:16 INFO DAGScheduler: ResultStage 3 (start at NativeMethodAccessorImpl.java:0) finished in 2.110 s 18/03/23 11:08:16 INFO DAGScheduler: Job 3 finished: start at NativeMethodAccessorImpl.java:0, took 2.123274 s 18/03/23 11:08:16 INFO StreamExecution: Streaming query made progress: { "id" : "1605966f-51e3-4df9-b284-7535e37b6d44", "runId" : "ab605bec-3d57-4a61-a45b-5174292ab2ec", "name" : null, "timestamp" : "2018-03-23T11:08:14.485Z", "numInputRows" : 1, "inputRowsPerSecond" : 41.666666666666664, "processedRowsPerSecond" : 0.44622936189201245, "durationMs" : { "addBatch" : 2149, "getBatch" : 13, "getOffset" : 11, "queryPlanning" : 9, "triggerExecution" : 2241, "walCommit" : 54 }, "stateOperators" : [ ], "sources" : [ { "description" : "KafkaSource[Subscribe[source]]", "startOffset" : { "source" : { "0" : 4647072 } }, "endOffset" : { "source" : { "0" : 4647073 } }, "numInputRows" : 1, "inputRowsPerSecond" : 41.666666666666664, "processedRowsPerSecond" : 0.44622936189201245 } ], "sink" : { "description" : "KafkaSink" } } 18/03/23 11:08:16 INFO StreamExecution: Streaming query made progress: { "id" : "1605966f-51e3-4df9-b284-7535e37b6d44", "runId" : "ab605bec-3d57-4a61-a45b-5174292ab2ec", "name" : null, "timestamp" : "2018-03-23T11:08:16.758Z", "numInputRows" : 0, "inputRowsPerSecond" : 0.0, "processedRowsPerSecond" : 0.0, "durationMs" : { "getOffset" : 16, "triggerExecution" : 16 }, "stateOperators" : [ ], "sources" : [ { "description" : "KafkaSource[Subscribe[source]]", "startOffset" : { "source" : { "0" : 4647073 } }, "endOffset" : { "source" : { "0" : 4647073 } }, "numInputRows" : 0, "inputRowsPerSecond" : 0.0, "processedRowsPerSecond" : 0.0 } ], "sink" : { "description" : "KafkaSink" } } 18/03/23 11:08:19 INFO StreamExecution: Committed offsets for batch 4. Metadata OffsetSeqMetadata(0,1521803299499,Map(spark.sql.shuffle.partitions -> 200)) 18/03/23 11:08:19 INFO KafkaSource: GetBatch called with start = Some({"source":{"0":4647073}}), end = {"source":{"0":4647074}} 18/03/23 11:08:19 INFO KafkaSource: Partitions added: Map() 18/03/23 11:08:19 INFO KafkaSource: GetBatch generating RDD of offset range: KafkaSourceRDDOffsetRange(source-0,4647073,4647074,None) 18/03/23 11:08:19 INFO SparkContext: Starting job: start at NativeMethodAccessorImpl.java:0 18/03/23 11:08:19 INFO DAGScheduler: Got job 4 (start at NativeMethodAccessorImpl.java:0) with 1 output partitions 18/03/23 11:08:19 INFO DAGScheduler: Final stage: ResultStage 4 (start at NativeMethodAccessorImpl.java:0) 18/03/23 11:08:19 INFO DAGScheduler: Parents of final stage: List() 18/03/23 11:08:19 INFO DAGScheduler: Missing parents: List() 18/03/23 11:08:19 INFO DAGScheduler: Submitting ResultStage 4 (MapPartitionsRDD[34] at start at NativeMethodAccessorImpl.java:0), which has no missing parents 18/03/23 11:08:19 INFO MemoryStore: Block broadcast_4 stored as values in memory (estimated size 16.9 KB, free 366.3 MB) 18/03/23 11:08:19 INFO MemoryStore: Block broadcast_4_piece0 stored as bytes in memory (estimated size 7.8 KB, free 366.3 MB) 18/03/23 11:08:19 INFO BlockManagerInfo: Added broadcast_4_piece0 in memory on 10.246.79.20:46249 (size: 7.8 KB, free: 366.3 MB) 18/03/23 11:08:19 INFO SparkContext: Created broadcast 4 from broadcast at DAGScheduler.scala:1006 18/03/23 11:08:19 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 4 (MapPartitionsRDD[34] at start at NativeMethodAccessorImpl.java:0) (first 15 tasks are for partitions Vector(0)) 18/03/23 11:08:19 INFO TaskSchedulerImpl: Adding task set 4.0 with 1 tasks 18/03/23 11:08:19 INFO TaskSetManager: Starting task 0.0 in stage 4.0 (TID 4, localhost, executor driver, partition 0, PROCESS_LOCAL, 5001 bytes) 18/03/23 11:08:19 INFO Executor: Running task 0.0 in stage 4.0 (TID 4) 18/03/23 11:08:19 WARN CachedKafkaConsumer: CachedKafkaConsumer is not running in UninterruptibleThread. It may hang when CachedKafkaConsumer's methods are interrupted because of KAFKA-1894 18/03/23 11:08:19 INFO BufferedStreamThread: Loading required package: lattice 18/03/23 11:08:19 INFO BufferedStreamThread: 18/03/23 11:08:19 INFO BufferedStreamThread: Attaching package: ‘lattice’ 18/03/23 11:08:19 INFO BufferedStreamThread: 18/03/23 11:08:19 INFO BufferedStreamThread: The following object is masked from ‘package:SparkR’: 18/03/23 11:08:19 INFO BufferedStreamThread: 18/03/23 11:08:19 INFO BufferedStreamThread: histogram 18/03/23 11:08:19 INFO BufferedStreamThread: 18/03/23 11:08:19 INFO BufferedStreamThread: Loading required package: ggplot2 18/03/23 11:08:21 INFO BufferedStreamThread: user system elapsed 18/03/23 11:08:21 INFO BufferedStreamThread: 1.931 0.071 2.002 18/03/23 11:08:21 INFO RRunner: Times: boot = 0.009 s, init = 0.017 s, broadcast = 0.000 s, read-input = 0.001 s, compute = 2.069 s, write-output = 0.002 s, total = 2.098 s 18/03/23 11:08:21 INFO Executor: Finished task 0.0 in stage 4.0 (TID 4). 1154 bytes result sent to driver 18/03/23 11:08:21 INFO TaskSetManager: Finished task 0.0 in stage 4.0 (TID 4) in 2122 ms on localhost (executor driver) (1/1) 18/03/23 11:08:21 INFO TaskSchedulerImpl: Removed TaskSet 4.0, whose tasks have all completed, from pool 18/03/23 11:08:21 INFO DAGScheduler: ResultStage 4 (start at NativeMethodAccessorImpl.java:0) finished in 2.124 s 18/03/23 11:08:21 INFO DAGScheduler: Job 4 finished: start at NativeMethodAccessorImpl.java:0, took 2.135279 s 18/03/23 11:08:21 INFO StreamExecution: Streaming query made progress: { "id" : "1605966f-51e3-4df9-b284-7535e37b6d44", "runId" : "ab605bec-3d57-4a61-a45b-5174292ab2ec", "name" : null, "timestamp" : "2018-03-23T11:08:19.488Z", "numInputRows" : 1, "inputRowsPerSecond" : 40.0, "processedRowsPerSecond" : 0.45004500450045004, "durationMs" : { "addBatch" : 2160, "getBatch" : 8, "getOffset" : 11, "queryPlanning" : 8, "triggerExecution" : 2222, "walCommit" : 32 }, "stateOperators" : [ ], "sources" : [ { "description" : "KafkaSource[Subscribe[source]]", "startOffset" : { "source" : { "0" : 4647073 } }, "endOffset" : { "source" : { "0" : 4647074 } }, "numInputRows" : 1, "inputRowsPerSecond" : 40.0, "processedRowsPerSecond" : 0.45004500450045004 } ], "sink" : { "description" : "KafkaSink" } } 18/03/23 11:08:21 INFO StreamExecution: Streaming query made progress: { "id" : "1605966f-51e3-4df9-b284-7535e37b6d44", "runId" : "ab605bec-3d57-4a61-a45b-5174292ab2ec", "name" : null, "timestamp" : "2018-03-23T11:08:21.747Z", "numInputRows" : 0, "inputRowsPerSecond" : 0.0, "processedRowsPerSecond" : 0.0, "durationMs" : { "getOffset" : 12, "triggerExecution" : 12 }, "stateOperators" : [ ], "sources" : [ { "description" : "KafkaSource[Subscribe[source]]", "startOffset" : { "source" : { "0" : 4647074 } }, "endOffset" : { "source" : { "0" : 4647074 } }, "numInputRows" : 0, "inputRowsPerSecond" : 0.0, "processedRowsPerSecond" : 0.0 } ], "sink" : { "description" : "KafkaSink" } } ^C Execution halted
--------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org