I have a native R model and doing structured streaming on it. Data comes
from Kafka and goes into dapply method where my model does prediction and
data is written to sink.
Problem:- My model requires caret package. Inside dapply function for every
stream job, caret package is loaded again which adds (~2s) delay.
kafka <- read.stream("kafka",subscribe = "source",
kafka.bootstrap.servers = "10.117.172.48:9092", topic = "source")
lines<- select(kafka, cast(kafka$value, "string"))
schema<-schema(lines)
library(caret)
df4<-dapply(lines,function(x){
print(system.time(library(caret)))
x
},schema)
q2 <- write.stream(df4,"kafka", checkpointLocation = loc, topic =
"sink", kafka.bootstrap.servers = "10.117.172.48:9092")
awaitTermination(q2)
For the above code, for every new stream my output is,
18/03/23 11:08:10 INFO BufferedStreamThread: Loading required package:
lattice
18/03/23 11:08:10 INFO BufferedStreamThread:
18/03/23 11:08:10 INFO BufferedStreamThread: Attaching package: ‘lattice’
18/03/23 11:08:10 INFO BufferedStreamThread:
18/03/23 11:08:10 INFO BufferedStreamThread: The following object is masked
from ‘package:SparkR’:
18/03/23 11:08:10 INFO BufferedStreamThread:
18/03/23 11:08:10 INFO BufferedStreamThread: histogram
18/03/23 11:08:10 INFO BufferedStreamThread:
18/03/23 11:08:10 INFO BufferedStreamThread: Loading required package:
ggplot2
18/03/23 11:08:12 INFO BufferedStreamThread: user system elapsed
18/03/23 11:08:12 INFO BufferedStreamThread: 1.937 0.062 1.999
18/03/23 11:08:12 INFO RRunner: Times: boot = 0.009 s, init = 0.017 s,
broadcast = 0.001 s, read-input = 0.001 s, compute = 2.064 s, write-output
= 0.001 s, total = 2.093 s
PFA: rest log file.
Ideally, the packages shouldn't be loaded again. I think the environment is
getting created and destroyed with each query. Is there some solution to
this? or Am I missing something here?
Thanks,
Deepansh
Java ref type org.apache.spark.sql.SparkSession id 1
Re-using existing Spark Context. Call sparkR.session.stop() or restart R to
create a new Spark Context
Warning message:
'sparkR.init' is deprecated.
Use 'sparkR.session' instead.
See help("Deprecated")
Loading required package: lattice
Attaching package: ‘lattice’
The following object is masked from ‘package:SparkR’:
histogram
Loading required package: ggplot2
18/03/23 11:07:58 INFO StreamExecution: Starting [id =
1605966f-51e3-4df9-b284-7535e37b6d44, runId =
ab605bec-3d57-4a61-a45b-5174292ab2ec]. Use /tmp/temporary-%20Gunp5pUYnV to
store the query checkpoint.
18/03/23 11:07:58 INFO ConsumerConfig: ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [10.117.172.48:9092]
check.crcs = true
client.id =
connections.max.idle.ms = 540000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id =
spark-kafka-source-a46c9dc7-36fc-40d3-a5c3-28d566108ab3--149645343-driver-0
heartbeat.interval.ms = 3000
interceptor.classes = null
key.deserializer = class
org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 1
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class
org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.ms = 50
request.timeout.ms = 305000
retry.backoff.ms = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class
org.apache.kafka.common.serialization.ByteArrayDeserializer
18/03/23 11:07:58 INFO ConsumerConfig: ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [10.117.172.48:9092]
check.crcs = true
client.id = consumer-1
connections.max.idle.ms = 540000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id =
spark-kafka-source-a46c9dc7-36fc-40d3-a5c3-28d566108ab3--149645343-driver-0
heartbeat.interval.ms = 3000
interceptor.classes = null
key.deserializer = class
org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 1
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class
org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.ms = 50
request.timeout.ms = 305000
retry.backoff.ms = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class
org.apache.kafka.common.serialization.ByteArrayDeserializer
18/03/23 11:07:59 INFO AppInfoParser: Kafka version : 0.10.2.0
18/03/23 11:07:59 INFO AppInfoParser: Kafka commitId : 576d93a8dc0cf421
18/03/23 11:07:59 INFO SessionState: Created local directory:
/tmp/fa519f67-cbad-4691-a260-744dd9c9a432_resources
18/03/23 11:07:59 INFO SessionState: Created HDFS directory:
/tmp/hive/d0g00m3/fa519f67-cbad-4691-a260-744dd9c9a432
18/03/23 11:07:59 INFO SessionState: Created local directory:
/tmp/d0g00m3/fa519f67-cbad-4691-a260-744dd9c9a432
18/03/23 11:07:59 INFO SessionState: Created HDFS directory:
/tmp/hive/d0g00m3/fa519f67-cbad-4691-a260-744dd9c9a432/_tmp_space.db
18/03/23 11:07:59 INFO HiveClientImpl: Warehouse location for Hive client
(version 1.2.1) is file:/u/users/d0g00m3/spark-warehouse/
18/03/23 11:07:59 INFO StreamExecution: Starting new streaming query.
18/03/23 11:07:59 INFO AbstractCoordinator: Discovered coordinator
10.117.172.48:9092 (id: 2147483647 rack: null) for group
spark-kafka-source-a46c9dc7-36fc-40d3-a5c3-28d566108ab3--149645343-driver-0.
18/03/23 11:07:59 INFO ConsumerCoordinator: Revoking previously assigned
partitions [] for group
spark-kafka-source-a46c9dc7-36fc-40d3-a5c3-28d566108ab3--149645343-driver-0
18/03/23 11:07:59 INFO AbstractCoordinator: (Re-)joining group
spark-kafka-source-a46c9dc7-36fc-40d3-a5c3-28d566108ab3--149645343-driver-0
18/03/23 11:07:59 INFO AbstractCoordinator: Successfully joined group
spark-kafka-source-a46c9dc7-36fc-40d3-a5c3-28d566108ab3--149645343-driver-0
with generation 1
18/03/23 11:07:59 INFO ConsumerCoordinator: Setting newly assigned partitions
[source-0] for group
spark-kafka-source-a46c9dc7-36fc-40d3-a5c3-28d566108ab3--149645343-driver-0
18/03/23 11:07:59 INFO KafkaSource: Initial offsets: {"source":{"0":4647070}}
18/03/23 11:07:59 INFO StreamExecution: Committed offsets for batch 0. Metadata
OffsetSeqMetadata(0,1521803279883,Map(spark.sql.shuffle.partitions -> 200))
18/03/23 11:07:59 INFO KafkaSource: GetBatch called with start = None, end =
{"source":{"0":4647070}}
18/03/23 11:07:59 INFO KafkaSource: Partitions added: Map()
18/03/23 11:08:00 INFO KafkaSource: GetBatch generating RDD of offset range:
KafkaSourceRDDOffsetRange(source-0,4647070,4647070,None)
18/03/23 11:08:00 INFO CodeGenerator: Code generated in 222.605008 ms
18/03/23 11:08:00 INFO CodeGenerator: Code generated in 12.444683 ms
18/03/23 11:08:00 INFO SparkContext: Starting job: start at
NativeMethodAccessorImpl.java:0
18/03/23 11:08:00 INFO DAGScheduler: Got job 0 (start at
NativeMethodAccessorImpl.java:0) with 1 output partitions
18/03/23 11:08:00 INFO DAGScheduler: Final stage: ResultStage 0 (start at
NativeMethodAccessorImpl.java:0)
18/03/23 11:08:00 INFO DAGScheduler: Parents of final stage: List()
18/03/23 11:08:00 INFO DAGScheduler: Missing parents: List()
18/03/23 11:08:00 INFO DAGScheduler: Submitting ResultStage 0
(MapPartitionsRDD[6] at start at NativeMethodAccessorImpl.java:0), which has no
missing parents
18/03/23 11:08:00 INFO MemoryStore: Block broadcast_0 stored as values in
memory (estimated size 16.9 KB, free 366.3 MB)
18/03/23 11:08:00 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in
memory (estimated size 7.8 KB, free 366.3 MB)
18/03/23 11:08:00 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on
10.246.79.20:46249 (size: 7.8 KB, free: 366.3 MB)
18/03/23 11:08:00 INFO SparkContext: Created broadcast 0 from broadcast at
DAGScheduler.scala:1006
18/03/23 11:08:00 INFO DAGScheduler: Submitting 1 missing tasks from
ResultStage 0 (MapPartitionsRDD[6] at start at NativeMethodAccessorImpl.java:0)
(first 15 tasks are for partitions Vector(0))
18/03/23 11:08:00 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
18/03/23 11:08:00 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0,
localhost, executor driver, partition 0, PROCESS_LOCAL, 5001 bytes)
18/03/23 11:08:00 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
18/03/23 11:08:00 INFO Executor: Fetching file:/u/users/d0g00m3/testlib.R with
timestamp 1521803266773
18/03/23 11:08:00 INFO Utils: /u/users/d0g00m3/testlib.R has been previously
copied to
/tmp/spark-e2d1b965-a3dc-4847-82ce-b09b8f7e2b76/userFiles-f7e87ef1-3a54-4f3f-8ee2-fa10278d0227/testlib.R
18/03/23 11:08:00 INFO Executor: Fetching
spark://10.246.79.20:57945/jars/org.apache.kafka_kafka-clients-0.10.2.0.jar
with timestamp 1521803266547
18/03/23 11:08:00 INFO TransportClientFactory: Successfully created connection
to /10.246.79.20:57945 after 10 ms (0 ms spent in bootstraps)
18/03/23 11:08:00 INFO Utils: Fetching
spark://10.246.79.20:57945/jars/org.apache.kafka_kafka-clients-0.10.2.0.jar to
/tmp/spark-e2d1b965-a3dc-4847-82ce-b09b8f7e2b76/userFiles-f7e87ef1-3a54-4f3f-8ee2-fa10278d0227/fetchFileTemp2480019594959116903.tmp
18/03/23 11:08:00 INFO Executor: Adding
file:/tmp/spark-e2d1b965-a3dc-4847-82ce-b09b8f7e2b76/userFiles-f7e87ef1-3a54-4f3f-8ee2-fa10278d0227/org.apache.kafka_kafka-clients-0.10.2.0.jar
to class loader
18/03/23 11:08:00 INFO Executor: Fetching
spark://10.246.79.20:57945/jars/org.apache.spark_spark-sql-kafka-0-10_2.11-2.2.0.jar
with timestamp 1521803266546
18/03/23 11:08:00 INFO Utils: Fetching
spark://10.246.79.20:57945/jars/org.apache.spark_spark-sql-kafka-0-10_2.11-2.2.0.jar
to
/tmp/spark-e2d1b965-a3dc-4847-82ce-b09b8f7e2b76/userFiles-f7e87ef1-3a54-4f3f-8ee2-fa10278d0227/fetchFileTemp8966057625455886086.tmp
18/03/23 11:08:00 INFO Executor: Adding
file:/tmp/spark-e2d1b965-a3dc-4847-82ce-b09b8f7e2b76/userFiles-f7e87ef1-3a54-4f3f-8ee2-fa10278d0227/org.apache.spark_spark-sql-kafka-0-10_2.11-2.2.0.jar
to class loader
18/03/23 11:08:01 INFO ConsumerConfig: ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = none
bootstrap.servers = [10.117.172.48:9092]
check.crcs = true
client.id =
connections.max.idle.ms = 540000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id =
spark-kafka-source-a46c9dc7-36fc-40d3-a5c3-28d566108ab3--149645343-executor
heartbeat.interval.ms = 3000
interceptor.classes = null
key.deserializer = class
org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class
org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.ms = 50
request.timeout.ms = 305000
retry.backoff.ms = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class
org.apache.kafka.common.serialization.ByteArrayDeserializer
18/03/23 11:08:01 INFO ConsumerConfig: ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = none
bootstrap.servers = [10.117.172.48:9092]
check.crcs = true
client.id = consumer-2
connections.max.idle.ms = 540000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id =
spark-kafka-source-a46c9dc7-36fc-40d3-a5c3-28d566108ab3--149645343-executor
heartbeat.interval.ms = 3000
interceptor.classes = null
key.deserializer = class
org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class
org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.ms = 50
request.timeout.ms = 305000
retry.backoff.ms = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class
org.apache.kafka.common.serialization.ByteArrayDeserializer
18/03/23 11:08:01 INFO AppInfoParser: Kafka version : 0.10.2.0
18/03/23 11:08:01 INFO AppInfoParser: Kafka commitId : 576d93a8dc0cf421
18/03/23 11:08:01 INFO KafkaSourceRDD: Beginning offset 4647070 is the same as
ending offset skipping source 0
18/03/23 11:08:01 INFO CodeGenerator: Code generated in 24.246571 ms
18/03/23 11:08:01 INFO CodeGenerator: Code generated in 14.032234 ms
18/03/23 11:08:01 INFO RRunner: Times: boot = 0.718 s, init = 0.009 s,
broadcast = 0.000 s, read-input = 0.000 s, compute = 0.000 s, write-output =
0.000 s, total = 0.727 s
18/03/23 11:08:01 INFO CodeGenerator: Code generated in 13.304316 ms
18/03/23 11:08:01 INFO ProducerConfig: ProducerConfig values:
acks = 1
batch.size = 16384
block.on.buffer.full = false
bootstrap.servers = [10.117.172.48:9092]
buffer.memory = 33554432
client.id =
compression.type = none
connections.max.idle.ms = 540000
interceptor.classes = null
key.serializer = class
org.apache.kafka.common.serialization.ByteArraySerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.fetch.timeout.ms = 60000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.sample.window.ms = 30000
partitioner.class = class
org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 0
retry.backoff.ms = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
timeout.ms = 30000
value.serializer = class
org.apache.kafka.common.serialization.ByteArraySerializer
18/03/23 11:08:01 INFO AppInfoParser: Kafka version : 0.10.2.0
18/03/23 11:08:01 INFO AppInfoParser: Kafka commitId : 576d93a8dc0cf421
18/03/23 11:08:01 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 1154
bytes result sent to driver
18/03/23 11:08:01 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0)
in 1096 ms on localhost (executor driver) (1/1)
18/03/23 11:08:01 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have
all completed, from pool
18/03/23 11:08:01 INFO DAGScheduler: ResultStage 0 (start at
NativeMethodAccessorImpl.java:0) finished in 1.128 s
18/03/23 11:08:01 INFO DAGScheduler: Job 0 finished: start at
NativeMethodAccessorImpl.java:0, took 1.335817 s
18/03/23 11:08:01 INFO StreamExecution: Streaming query made progress: {
"id" : "1605966f-51e3-4df9-b284-7535e37b6d44",
"runId" : "ab605bec-3d57-4a61-a45b-5174292ab2ec",
"name" : null,
"timestamp" : "2018-03-23T11:07:59.157Z",
"numInputRows" : 0,
"processedRowsPerSecond" : 0.0,
"durationMs" : {
"addBatch" : 1754,
"getBatch" : 115,
"getOffset" : 715,
"queryPlanning" : 135,
"triggerExecution" : 2782,
"walCommit" : 43
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "KafkaSource[Subscribe[source]]",
"startOffset" : null,
"endOffset" : {
"source" : {
"0" : 4647070
}
},
"numInputRows" : 0,
"processedRowsPerSecond" : 0.0
} ],
"sink" : {
"description" : "KafkaSink"
}
}
18/03/23 11:08:02 INFO StreamExecution: Streaming query made progress: {
"id" : "1605966f-51e3-4df9-b284-7535e37b6d44",
"runId" : "ab605bec-3d57-4a61-a45b-5174292ab2ec",
"name" : null,
"timestamp" : "2018-03-23T11:08:02.197Z",
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"durationMs" : {
"getOffset" : 15,
"triggerExecution" : 16
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "KafkaSource[Subscribe[source]]",
"startOffset" : {
"source" : {
"0" : 4647070
}
},
"endOffset" : {
"source" : {
"0" : 4647070
}
},
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0
} ],
"sink" : {
"description" : "KafkaSink"
}
}
18/03/23 11:08:04 INFO StreamExecution: Committed offsets for batch 1. Metadata
OffsetSeqMetadata(0,1521803284620,Map(spark.sql.shuffle.partitions -> 200))
18/03/23 11:08:04 INFO KafkaSource: GetBatch called with start =
Some({"source":{"0":4647070}}), end = {"source":{"0":4647071}}
18/03/23 11:08:04 INFO KafkaSource: Partitions added: Map()
18/03/23 11:08:04 INFO KafkaSource: GetBatch generating RDD of offset range:
KafkaSourceRDDOffsetRange(source-0,4647070,4647071,None)
18/03/23 11:08:04 INFO SparkContext: Starting job: start at
NativeMethodAccessorImpl.java:0
18/03/23 11:08:04 INFO DAGScheduler: Got job 1 (start at
NativeMethodAccessorImpl.java:0) with 1 output partitions
18/03/23 11:08:04 INFO DAGScheduler: Final stage: ResultStage 1 (start at
NativeMethodAccessorImpl.java:0)
18/03/23 11:08:04 INFO DAGScheduler: Parents of final stage: List()
18/03/23 11:08:04 INFO DAGScheduler: Missing parents: List()
18/03/23 11:08:04 INFO DAGScheduler: Submitting ResultStage 1
(MapPartitionsRDD[13] at start at NativeMethodAccessorImpl.java:0), which has
no missing parents
18/03/23 11:08:04 INFO MemoryStore: Block broadcast_1 stored as values in
memory (estimated size 16.9 KB, free 366.3 MB)
18/03/23 11:08:04 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in
memory (estimated size 7.8 KB, free 366.3 MB)
18/03/23 11:08:04 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on
10.246.79.20:46249 (size: 7.8 KB, free: 366.3 MB)
18/03/23 11:08:04 INFO SparkContext: Created broadcast 1 from broadcast at
DAGScheduler.scala:1006
18/03/23 11:08:04 INFO DAGScheduler: Submitting 1 missing tasks from
ResultStage 1 (MapPartitionsRDD[13] at start at
NativeMethodAccessorImpl.java:0) (first 15 tasks are for partitions Vector(0))
18/03/23 11:08:04 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks
18/03/23 11:08:04 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1,
localhost, executor driver, partition 0, PROCESS_LOCAL, 5001 bytes)
18/03/23 11:08:04 INFO Executor: Running task 0.0 in stage 1.0 (TID 1)
18/03/23 11:08:04 WARN CachedKafkaConsumer: CachedKafkaConsumer is not running
in UninterruptibleThread. It may hang when CachedKafkaConsumer's methods are
interrupted because of KAFKA-1894
18/03/23 11:08:04 INFO BufferedStreamThread: Loading required package: lattice
18/03/23 11:08:04 INFO BufferedStreamThread:
18/03/23 11:08:04 INFO BufferedStreamThread: Attaching package: ‘lattice’
18/03/23 11:08:04 INFO BufferedStreamThread:
18/03/23 11:08:04 INFO BufferedStreamThread: The following object is masked
from ‘package:SparkR’:
18/03/23 11:08:04 INFO BufferedStreamThread:
18/03/23 11:08:04 INFO BufferedStreamThread: histogram
18/03/23 11:08:04 INFO BufferedStreamThread:
18/03/23 11:08:04 INFO BufferedStreamThread: Loading required package: ggplot2
18/03/23 11:08:07 INFO BufferedStreamThread: user system elapsed
18/03/23 11:08:07 INFO BufferedStreamThread: 2.038 0.074 2.114
18/03/23 11:08:07 INFO RRunner: Times: boot = 0.011 s, init = 0.088 s,
broadcast = 0.000 s, read-input = 0.001 s, compute = 2.176 s, write-output =
0.001 s, total = 2.277 s
18/03/23 11:08:07 INFO Executor: Finished task 0.0 in stage 1.0 (TID 1). 1154
bytes result sent to driver
18/03/23 11:08:07 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1)
in 2380 ms on localhost (executor driver) (1/1)
18/03/23 11:08:07 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have
all completed, from pool
18/03/23 11:08:07 INFO DAGScheduler: ResultStage 1 (start at
NativeMethodAccessorImpl.java:0) finished in 2.381 s
18/03/23 11:08:07 INFO DAGScheduler: Job 1 finished: start at
NativeMethodAccessorImpl.java:0, took 2.395044 s
18/03/23 11:08:07 INFO StreamExecution: Streaming query made progress: {
"id" : "1605966f-51e3-4df9-b284-7535e37b6d44",
"runId" : "ab605bec-3d57-4a61-a45b-5174292ab2ec",
"name" : null,
"timestamp" : "2018-03-23T11:08:04.604Z",
"numInputRows" : 1,
"inputRowsPerSecond" : 37.03703703703704,
"processedRowsPerSecond" : 0.3947887879984209,
"durationMs" : {
"addBatch" : 2422,
"getBatch" : 9,
"getOffset" : 15,
"queryPlanning" : 8,
"triggerExecution" : 2533,
"walCommit" : 75
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "KafkaSource[Subscribe[source]]",
"startOffset" : {
"source" : {
"0" : 4647070
}
},
"endOffset" : {
"source" : {
"0" : 4647071
}
},
"numInputRows" : 1,
"inputRowsPerSecond" : 37.03703703703704,
"processedRowsPerSecond" : 0.3947887879984209
} ],
"sink" : {
"description" : "KafkaSink"
}
}
18/03/23 11:08:07 INFO StreamExecution: Streaming query made progress: {
"id" : "1605966f-51e3-4df9-b284-7535e37b6d44",
"runId" : "ab605bec-3d57-4a61-a45b-5174292ab2ec",
"name" : null,
"timestamp" : "2018-03-23T11:08:07.183Z",
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"durationMs" : {
"getOffset" : 17,
"triggerExecution" : 17
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "KafkaSource[Subscribe[source]]",
"startOffset" : {
"source" : {
"0" : 4647071
}
},
"endOffset" : {
"source" : {
"0" : 4647071
}
},
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0
} ],
"sink" : {
"description" : "KafkaSink"
}
}
18/03/23 11:08:10 INFO StreamExecution: Committed offsets for batch 2. Metadata
OffsetSeqMetadata(0,1521803290685,Map(spark.sql.shuffle.partitions -> 200))
18/03/23 11:08:10 INFO KafkaSource: GetBatch called with start =
Some({"source":{"0":4647071}}), end = {"source":{"0":4647072}}
18/03/23 11:08:10 INFO KafkaSource: Partitions added: Map()
18/03/23 11:08:10 INFO KafkaSource: GetBatch generating RDD of offset range:
KafkaSourceRDDOffsetRange(source-0,4647071,4647072,None)
18/03/23 11:08:10 INFO SparkContext: Starting job: start at
NativeMethodAccessorImpl.java:0
18/03/23 11:08:10 INFO DAGScheduler: Got job 2 (start at
NativeMethodAccessorImpl.java:0) with 1 output partitions
18/03/23 11:08:10 INFO DAGScheduler: Final stage: ResultStage 2 (start at
NativeMethodAccessorImpl.java:0)
18/03/23 11:08:10 INFO DAGScheduler: Parents of final stage: List()
18/03/23 11:08:10 INFO DAGScheduler: Missing parents: List()
18/03/23 11:08:10 INFO DAGScheduler: Submitting ResultStage 2
(MapPartitionsRDD[20] at start at NativeMethodAccessorImpl.java:0), which has
no missing parents
18/03/23 11:08:10 INFO MemoryStore: Block broadcast_2 stored as values in
memory (estimated size 16.9 KB, free 366.2 MB)
18/03/23 11:08:10 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in
memory (estimated size 7.8 KB, free 366.2 MB)
18/03/23 11:08:10 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on
10.246.79.20:46249 (size: 7.8 KB, free: 366.3 MB)
18/03/23 11:08:10 INFO SparkContext: Created broadcast 2 from broadcast at
DAGScheduler.scala:1006
18/03/23 11:08:10 INFO DAGScheduler: Submitting 1 missing tasks from
ResultStage 2 (MapPartitionsRDD[20] at start at
NativeMethodAccessorImpl.java:0) (first 15 tasks are for partitions Vector(0))
18/03/23 11:08:10 INFO TaskSchedulerImpl: Adding task set 2.0 with 1 tasks
18/03/23 11:08:10 INFO TaskSetManager: Starting task 0.0 in stage 2.0 (TID 2,
localhost, executor driver, partition 0, PROCESS_LOCAL, 5001 bytes)
18/03/23 11:08:10 INFO Executor: Running task 0.0 in stage 2.0 (TID 2)
18/03/23 11:08:10 WARN CachedKafkaConsumer: CachedKafkaConsumer is not running
in UninterruptibleThread. It may hang when CachedKafkaConsumer's methods are
interrupted because of KAFKA-1894
18/03/23 11:08:10 INFO BufferedStreamThread: Loading required package: lattice
18/03/23 11:08:10 INFO BufferedStreamThread:
18/03/23 11:08:10 INFO BufferedStreamThread: Attaching package: ‘lattice’
18/03/23 11:08:10 INFO BufferedStreamThread:
18/03/23 11:08:10 INFO BufferedStreamThread: The following object is masked
from ‘package:SparkR’:
18/03/23 11:08:10 INFO BufferedStreamThread:
18/03/23 11:08:10 INFO BufferedStreamThread: histogram
18/03/23 11:08:10 INFO BufferedStreamThread:
18/03/23 11:08:10 INFO BufferedStreamThread: Loading required package: ggplot2
18/03/23 11:08:12 INFO BufferedStreamThread: user system elapsed
18/03/23 11:08:12 INFO BufferedStreamThread: 1.937 0.062 1.999
18/03/23 11:08:12 INFO RRunner: Times: boot = 0.009 s, init = 0.017 s,
broadcast = 0.001 s, read-input = 0.001 s, compute = 2.064 s, write-output =
0.001 s, total = 2.093 s
18/03/23 11:08:12 INFO Executor: Finished task 0.0 in stage 2.0 (TID 2). 1154
bytes result sent to driver
18/03/23 11:08:12 INFO TaskSetManager: Finished task 0.0 in stage 2.0 (TID 2)
in 2122 ms on localhost (executor driver) (1/1)
18/03/23 11:08:12 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have
all completed, from pool
18/03/23 11:08:12 INFO DAGScheduler: ResultStage 2 (start at
NativeMethodAccessorImpl.java:0) finished in 2.122 s
18/03/23 11:08:12 INFO DAGScheduler: Job 2 finished: start at
NativeMethodAccessorImpl.java:0, took 2.137442 s
18/03/23 11:08:12 INFO StreamExecution: Streaming query made progress: {
"id" : "1605966f-51e3-4df9-b284-7535e37b6d44",
"runId" : "ab605bec-3d57-4a61-a45b-5174292ab2ec",
"name" : null,
"timestamp" : "2018-03-23T11:08:10.660Z",
"numInputRows" : 1,
"inputRowsPerSecond" : 27.027027027027028,
"processedRowsPerSecond" : 0.4452359750667854,
"durationMs" : {
"addBatch" : 2170,
"getBatch" : 9,
"getOffset" : 24,
"queryPlanning" : 9,
"triggerExecution" : 2246,
"walCommit" : 29
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "KafkaSource[Subscribe[source]]",
"startOffset" : {
"source" : {
"0" : 4647071
}
},
"endOffset" : {
"source" : {
"0" : 4647072
}
},
"numInputRows" : 1,
"inputRowsPerSecond" : 27.027027027027028,
"processedRowsPerSecond" : 0.4452359750667854
} ],
"sink" : {
"description" : "KafkaSink"
}
}
18/03/23 11:08:12 INFO StreamExecution: Streaming query made progress: {
"id" : "1605966f-51e3-4df9-b284-7535e37b6d44",
"runId" : "ab605bec-3d57-4a61-a45b-5174292ab2ec",
"name" : null,
"timestamp" : "2018-03-23T11:08:12.942Z",
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"durationMs" : {
"getOffset" : 12,
"triggerExecution" : 12
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "KafkaSource[Subscribe[source]]",
"startOffset" : {
"source" : {
"0" : 4647072
}
},
"endOffset" : {
"source" : {
"0" : 4647072
}
},
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0
} ],
"sink" : {
"description" : "KafkaSink"
}
}
18/03/23 11:08:14 INFO StreamExecution: Committed offsets for batch 3. Metadata
OffsetSeqMetadata(0,1521803294496,Map(spark.sql.shuffle.partitions -> 200))
18/03/23 11:08:14 INFO KafkaSource: GetBatch called with start =
Some({"source":{"0":4647072}}), end = {"source":{"0":4647073}}
18/03/23 11:08:14 INFO KafkaSource: Partitions added: Map()
18/03/23 11:08:14 INFO BlockManagerInfo: Removed broadcast_2_piece0 on
10.246.79.20:46249 in memory (size: 7.8 KB, free: 366.3 MB)
18/03/23 11:08:14 INFO KafkaSource: GetBatch generating RDD of offset range:
KafkaSourceRDDOffsetRange(source-0,4647072,4647073,None)
18/03/23 11:08:14 INFO BlockManagerInfo: Removed broadcast_0_piece0 on
10.246.79.20:46249 in memory (size: 7.8 KB, free: 366.3 MB)
18/03/23 11:08:14 INFO BlockManagerInfo: Removed broadcast_1_piece0 on
10.246.79.20:46249 in memory (size: 7.8 KB, free: 366.3 MB)
18/03/23 11:08:14 INFO SparkContext: Starting job: start at
NativeMethodAccessorImpl.java:0
18/03/23 11:08:14 INFO DAGScheduler: Got job 3 (start at
NativeMethodAccessorImpl.java:0) with 1 output partitions
18/03/23 11:08:14 INFO DAGScheduler: Final stage: ResultStage 3 (start at
NativeMethodAccessorImpl.java:0)
18/03/23 11:08:14 INFO DAGScheduler: Parents of final stage: List()
18/03/23 11:08:14 INFO DAGScheduler: Missing parents: List()
18/03/23 11:08:14 INFO DAGScheduler: Submitting ResultStage 3
(MapPartitionsRDD[27] at start at NativeMethodAccessorImpl.java:0), which has
no missing parents
18/03/23 11:08:14 INFO MemoryStore: Block broadcast_3 stored as values in
memory (estimated size 16.9 KB, free 366.3 MB)
18/03/23 11:08:14 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in
memory (estimated size 7.8 KB, free 366.3 MB)
18/03/23 11:08:14 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on
10.246.79.20:46249 (size: 7.8 KB, free: 366.3 MB)
18/03/23 11:08:14 INFO SparkContext: Created broadcast 3 from broadcast at
DAGScheduler.scala:1006
18/03/23 11:08:14 INFO DAGScheduler: Submitting 1 missing tasks from
ResultStage 3 (MapPartitionsRDD[27] at start at
NativeMethodAccessorImpl.java:0) (first 15 tasks are for partitions Vector(0))
18/03/23 11:08:14 INFO TaskSchedulerImpl: Adding task set 3.0 with 1 tasks
18/03/23 11:08:14 INFO TaskSetManager: Starting task 0.0 in stage 3.0 (TID 3,
localhost, executor driver, partition 0, PROCESS_LOCAL, 5001 bytes)
18/03/23 11:08:14 INFO Executor: Running task 0.0 in stage 3.0 (TID 3)
18/03/23 11:08:14 WARN CachedKafkaConsumer: CachedKafkaConsumer is not running
in UninterruptibleThread. It may hang when CachedKafkaConsumer's methods are
interrupted because of KAFKA-1894
18/03/23 11:08:14 INFO BufferedStreamThread: Loading required package: lattice
18/03/23 11:08:14 INFO BufferedStreamThread:
18/03/23 11:08:14 INFO BufferedStreamThread: Attaching package: ‘lattice’
18/03/23 11:08:14 INFO BufferedStreamThread:
18/03/23 11:08:14 INFO BufferedStreamThread: The following object is masked
from ‘package:SparkR’:
18/03/23 11:08:14 INFO BufferedStreamThread:
18/03/23 11:08:14 INFO BufferedStreamThread: histogram
18/03/23 11:08:14 INFO BufferedStreamThread:
18/03/23 11:08:14 INFO BufferedStreamThread: Loading required package: ggplot2
18/03/23 11:08:16 INFO BufferedStreamThread: user system elapsed
18/03/23 11:08:16 INFO BufferedStreamThread: 1.916 0.068 1.987
18/03/23 11:08:16 INFO RRunner: Times: boot = 0.009 s, init = 0.019 s,
broadcast = 0.001 s, read-input = 0.001 s, compute = 2.054 s, write-output =
0.001 s, total = 2.085 s
18/03/23 11:08:16 INFO Executor: Finished task 0.0 in stage 3.0 (TID 3). 1154
bytes result sent to driver
18/03/23 11:08:16 INFO TaskSetManager: Finished task 0.0 in stage 3.0 (TID 3)
in 2109 ms on localhost (executor driver) (1/1)
18/03/23 11:08:16 INFO TaskSchedulerImpl: Removed TaskSet 3.0, whose tasks have
all completed, from pool
18/03/23 11:08:16 INFO DAGScheduler: ResultStage 3 (start at
NativeMethodAccessorImpl.java:0) finished in 2.110 s
18/03/23 11:08:16 INFO DAGScheduler: Job 3 finished: start at
NativeMethodAccessorImpl.java:0, took 2.123274 s
18/03/23 11:08:16 INFO StreamExecution: Streaming query made progress: {
"id" : "1605966f-51e3-4df9-b284-7535e37b6d44",
"runId" : "ab605bec-3d57-4a61-a45b-5174292ab2ec",
"name" : null,
"timestamp" : "2018-03-23T11:08:14.485Z",
"numInputRows" : 1,
"inputRowsPerSecond" : 41.666666666666664,
"processedRowsPerSecond" : 0.44622936189201245,
"durationMs" : {
"addBatch" : 2149,
"getBatch" : 13,
"getOffset" : 11,
"queryPlanning" : 9,
"triggerExecution" : 2241,
"walCommit" : 54
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "KafkaSource[Subscribe[source]]",
"startOffset" : {
"source" : {
"0" : 4647072
}
},
"endOffset" : {
"source" : {
"0" : 4647073
}
},
"numInputRows" : 1,
"inputRowsPerSecond" : 41.666666666666664,
"processedRowsPerSecond" : 0.44622936189201245
} ],
"sink" : {
"description" : "KafkaSink"
}
}
18/03/23 11:08:16 INFO StreamExecution: Streaming query made progress: {
"id" : "1605966f-51e3-4df9-b284-7535e37b6d44",
"runId" : "ab605bec-3d57-4a61-a45b-5174292ab2ec",
"name" : null,
"timestamp" : "2018-03-23T11:08:16.758Z",
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"durationMs" : {
"getOffset" : 16,
"triggerExecution" : 16
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "KafkaSource[Subscribe[source]]",
"startOffset" : {
"source" : {
"0" : 4647073
}
},
"endOffset" : {
"source" : {
"0" : 4647073
}
},
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0
} ],
"sink" : {
"description" : "KafkaSink"
}
}
18/03/23 11:08:19 INFO StreamExecution: Committed offsets for batch 4. Metadata
OffsetSeqMetadata(0,1521803299499,Map(spark.sql.shuffle.partitions -> 200))
18/03/23 11:08:19 INFO KafkaSource: GetBatch called with start =
Some({"source":{"0":4647073}}), end = {"source":{"0":4647074}}
18/03/23 11:08:19 INFO KafkaSource: Partitions added: Map()
18/03/23 11:08:19 INFO KafkaSource: GetBatch generating RDD of offset range:
KafkaSourceRDDOffsetRange(source-0,4647073,4647074,None)
18/03/23 11:08:19 INFO SparkContext: Starting job: start at
NativeMethodAccessorImpl.java:0
18/03/23 11:08:19 INFO DAGScheduler: Got job 4 (start at
NativeMethodAccessorImpl.java:0) with 1 output partitions
18/03/23 11:08:19 INFO DAGScheduler: Final stage: ResultStage 4 (start at
NativeMethodAccessorImpl.java:0)
18/03/23 11:08:19 INFO DAGScheduler: Parents of final stage: List()
18/03/23 11:08:19 INFO DAGScheduler: Missing parents: List()
18/03/23 11:08:19 INFO DAGScheduler: Submitting ResultStage 4
(MapPartitionsRDD[34] at start at NativeMethodAccessorImpl.java:0), which has
no missing parents
18/03/23 11:08:19 INFO MemoryStore: Block broadcast_4 stored as values in
memory (estimated size 16.9 KB, free 366.3 MB)
18/03/23 11:08:19 INFO MemoryStore: Block broadcast_4_piece0 stored as bytes in
memory (estimated size 7.8 KB, free 366.3 MB)
18/03/23 11:08:19 INFO BlockManagerInfo: Added broadcast_4_piece0 in memory on
10.246.79.20:46249 (size: 7.8 KB, free: 366.3 MB)
18/03/23 11:08:19 INFO SparkContext: Created broadcast 4 from broadcast at
DAGScheduler.scala:1006
18/03/23 11:08:19 INFO DAGScheduler: Submitting 1 missing tasks from
ResultStage 4 (MapPartitionsRDD[34] at start at
NativeMethodAccessorImpl.java:0) (first 15 tasks are for partitions Vector(0))
18/03/23 11:08:19 INFO TaskSchedulerImpl: Adding task set 4.0 with 1 tasks
18/03/23 11:08:19 INFO TaskSetManager: Starting task 0.0 in stage 4.0 (TID 4,
localhost, executor driver, partition 0, PROCESS_LOCAL, 5001 bytes)
18/03/23 11:08:19 INFO Executor: Running task 0.0 in stage 4.0 (TID 4)
18/03/23 11:08:19 WARN CachedKafkaConsumer: CachedKafkaConsumer is not running
in UninterruptibleThread. It may hang when CachedKafkaConsumer's methods are
interrupted because of KAFKA-1894
18/03/23 11:08:19 INFO BufferedStreamThread: Loading required package: lattice
18/03/23 11:08:19 INFO BufferedStreamThread:
18/03/23 11:08:19 INFO BufferedStreamThread: Attaching package: ‘lattice’
18/03/23 11:08:19 INFO BufferedStreamThread:
18/03/23 11:08:19 INFO BufferedStreamThread: The following object is masked
from ‘package:SparkR’:
18/03/23 11:08:19 INFO BufferedStreamThread:
18/03/23 11:08:19 INFO BufferedStreamThread: histogram
18/03/23 11:08:19 INFO BufferedStreamThread:
18/03/23 11:08:19 INFO BufferedStreamThread: Loading required package: ggplot2
18/03/23 11:08:21 INFO BufferedStreamThread: user system elapsed
18/03/23 11:08:21 INFO BufferedStreamThread: 1.931 0.071 2.002
18/03/23 11:08:21 INFO RRunner: Times: boot = 0.009 s, init = 0.017 s,
broadcast = 0.000 s, read-input = 0.001 s, compute = 2.069 s, write-output =
0.002 s, total = 2.098 s
18/03/23 11:08:21 INFO Executor: Finished task 0.0 in stage 4.0 (TID 4). 1154
bytes result sent to driver
18/03/23 11:08:21 INFO TaskSetManager: Finished task 0.0 in stage 4.0 (TID 4)
in 2122 ms on localhost (executor driver) (1/1)
18/03/23 11:08:21 INFO TaskSchedulerImpl: Removed TaskSet 4.0, whose tasks have
all completed, from pool
18/03/23 11:08:21 INFO DAGScheduler: ResultStage 4 (start at
NativeMethodAccessorImpl.java:0) finished in 2.124 s
18/03/23 11:08:21 INFO DAGScheduler: Job 4 finished: start at
NativeMethodAccessorImpl.java:0, took 2.135279 s
18/03/23 11:08:21 INFO StreamExecution: Streaming query made progress: {
"id" : "1605966f-51e3-4df9-b284-7535e37b6d44",
"runId" : "ab605bec-3d57-4a61-a45b-5174292ab2ec",
"name" : null,
"timestamp" : "2018-03-23T11:08:19.488Z",
"numInputRows" : 1,
"inputRowsPerSecond" : 40.0,
"processedRowsPerSecond" : 0.45004500450045004,
"durationMs" : {
"addBatch" : 2160,
"getBatch" : 8,
"getOffset" : 11,
"queryPlanning" : 8,
"triggerExecution" : 2222,
"walCommit" : 32
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "KafkaSource[Subscribe[source]]",
"startOffset" : {
"source" : {
"0" : 4647073
}
},
"endOffset" : {
"source" : {
"0" : 4647074
}
},
"numInputRows" : 1,
"inputRowsPerSecond" : 40.0,
"processedRowsPerSecond" : 0.45004500450045004
} ],
"sink" : {
"description" : "KafkaSink"
}
}
18/03/23 11:08:21 INFO StreamExecution: Streaming query made progress: {
"id" : "1605966f-51e3-4df9-b284-7535e37b6d44",
"runId" : "ab605bec-3d57-4a61-a45b-5174292ab2ec",
"name" : null,
"timestamp" : "2018-03-23T11:08:21.747Z",
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"durationMs" : {
"getOffset" : 12,
"triggerExecution" : 12
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "KafkaSource[Subscribe[source]]",
"startOffset" : {
"source" : {
"0" : 4647074
}
},
"endOffset" : {
"source" : {
"0" : 4647074
}
},
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0
} ],
"sink" : {
"description" : "KafkaSink"
}
}
^C
Execution halted
---------------------------------------------------------------------
To unsubscribe e-mail: [email protected]