[ https://issues.apache.org/jira/browse/SPARK-23288?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Apache Spark reassigned SPARK-23288: ------------------------------------ Assignee: (was: Apache Spark) > Incorrect number of written records in structured streaming > ----------------------------------------------------------- > > Key: SPARK-23288 > URL: https://issues.apache.org/jira/browse/SPARK-23288 > Project: Spark > Issue Type: Bug > Components: SQL, Structured Streaming > Affects Versions: 2.2.0 > Reporter: Yuriy Bondaruk > Priority: Major > Labels: Metrics, metrics > > I'm using SparkListener.onTaskEnd() to capture input and output metrics but > it seems that number of written records > ('taskEnd.taskMetrics().outputMetrics().recordsWritten()') is incorrect. Here > is my stream construction: > > {code:java} > StreamingQuery writeStream = session > .readStream() > .schema(RecordSchema.fromClass(TestRecord.class)) > .option(OPTION_KEY_DELIMITER, OPTION_VALUE_DELIMITER_TAB) > .option(OPTION_KEY_QUOTE, OPTION_VALUE_QUOTATION_OFF) > .csv(inputFolder.getRoot().toPath().toString()) > .as(Encoders.bean(TestRecord.class)) > .flatMap( > ((FlatMapFunction<TestRecord, TestVendingRecord>) (u) -> { > List<TestVendingRecord> resultIterable = new ArrayList<>(); > try { > TestVendingRecord result = transformer.convert(u); > resultIterable.add(result); > } catch (Throwable t) { > System.err.println("Ooops"); > t.printStackTrace(); > } > return resultIterable.iterator(); > }), > Encoders.bean(TestVendingRecord.class)) > .writeStream() > .outputMode(OutputMode.Append()) > .format("parquet") > .option("path", outputFolder.getRoot().toPath().toString()) > .option("checkpointLocation", > checkpointFolder.getRoot().toPath().toString()) > .start(); > writeStream.processAllAvailable(); > writeStream.stop(); > {code} > Tested it with one good and one bad (throwing exception in > transformer.convert(u)) input records and it produces following metrics: > > {code:java} > (TestMain.java:onTaskEnd(73)) - -----------status--> SUCCESS > (TestMain.java:onTaskEnd(75)) - -----------recordsWritten--> 0 > (TestMain.java:onTaskEnd(76)) - -----------recordsRead-----> 2 > (TestMain.java:onTaskEnd(83)) - taskEnd.taskInfo().accumulables(): > (TestMain.java:onTaskEnd(84)) - name = duration total (min, med, max) > (TestMain.java:onTaskEnd(85)) - value = 323 > (TestMain.java:onTaskEnd(84)) - name = number of output rows > (TestMain.java:onTaskEnd(85)) - value = 2 > (TestMain.java:onTaskEnd(84)) - name = duration total (min, med, max) > (TestMain.java:onTaskEnd(85)) - value = 364 > (TestMain.java:onTaskEnd(84)) - name = internal.metrics.input.recordsRead > (TestMain.java:onTaskEnd(85)) - value = 2 > (TestMain.java:onTaskEnd(84)) - name = internal.metrics.input.bytesRead > (TestMain.java:onTaskEnd(85)) - value = 157 > (TestMain.java:onTaskEnd(84)) - name = > internal.metrics.resultSerializationTime > (TestMain.java:onTaskEnd(85)) - value = 3 > (TestMain.java:onTaskEnd(84)) - name = internal.metrics.resultSize > (TestMain.java:onTaskEnd(85)) - value = 2396 > (TestMain.java:onTaskEnd(84)) - name = internal.metrics.executorCpuTime > (TestMain.java:onTaskEnd(85)) - value = 633807000 > (TestMain.java:onTaskEnd(84)) - name = internal.metrics.executorRunTime > (TestMain.java:onTaskEnd(85)) - value = 683 > (TestMain.java:onTaskEnd(84)) - name = > internal.metrics.executorDeserializeCpuTime > (TestMain.java:onTaskEnd(85)) - value = 55662000 > (TestMain.java:onTaskEnd(84)) - name = > internal.metrics.executorDeserializeTime > (TestMain.java:onTaskEnd(85)) - value = 58 > (TestMain.java:onTaskEnd(89)) - input records 2 > Streaming query made progress: { > "id" : "1231f9cb-b2e8-4d10-804d-73d7826c1cb5", > "runId" : "bd23b60c-93f9-4e17-b3bc-55403edce4e7", > "name" : null, > "timestamp" : "2018-01-26T14:44:05.362Z", > "numInputRows" : 2, > "processedRowsPerSecond" : 0.8163265306122448, > "durationMs" : { > "addBatch" : 1994, > "getBatch" : 126, > "getOffset" : 52, > "queryPlanning" : 220, > "triggerExecution" : 2450, > "walCommit" : 41 > }, > "stateOperators" : [ ], > "sources" : [ { > "description" : > "FileStreamSource[file:/var/folders/4w/zks_kfls2s3glmrj3f725p7hllyb5_/T/junit3661035412295337071]", > "startOffset" : null, > "endOffset" : { > "logOffset" : 0 > }, > "numInputRows" : 2, > "processedRowsPerSecond" : 0.8163265306122448 > } ], > "sink" : { > "description" : > "FileSink[/var/folders/4w/zks_kfls2s3glmrj3f725p7hllyb5_/T/junit3785605384928624065]" > } > } > {code} > The number of inputs is correct but the number of output records taken from > taskEnd.taskMetrics().outputMetrics().recordsWritten() is zero. Accumulables > (taskEnd.taskInfo().accumulables()) don't have a correct value as well - > should be 1 but it shows 2 'number of output rows'. -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org