This is an automated email from the ASF dual-hosted git repository. srinivasulu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push: new 3b8dc03 Propogating the system messages to the stream. (#937) 3b8dc03 is described below commit 3b8dc03b22bf37aeee192190daf26653de5d61ea Author: Srinivasulu Punuru <srinipun...@users.noreply.github.com> AuthorDate: Mon Mar 4 16:21:51 2019 -0800 Propogating the system messages to the stream. (#937) * Adding system messages to the stream * checkstyle fixes --- .../samza/sql/data/SamzaSqlRelMsgMetadata.java | 24 +++++- .../sql/runner/SamzaSqlApplicationConfig.java | 8 ++ .../samza/sql/translator/ProjectTranslator.java | 1 - .../samza/sql/translator/QueryTranslator.java | 12 +++ .../samza/sql/translator/ScanTranslator.java | 98 +++++++++++++++------- .../TranslatorInputMetricsMapFunction.java | 3 +- .../TranslatorOutputMetricsMapFunction.java | 2 +- .../samza/test/samzasql/TestSamzaSqlEndToEnd.java | 25 +++++- 8 files changed, 133 insertions(+), 40 deletions(-) diff --git a/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMsgMetadata.java b/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMsgMetadata.java index 713ecbe..14f2892 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMsgMetadata.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMsgMetadata.java @@ -20,6 +20,8 @@ package org.apache.samza.sql.data; import java.io.Serializable; +import java.time.Instant; +import org.codehaus.jackson.annotate.JsonIgnore; import org.codehaus.jackson.annotate.JsonProperty; @@ -36,9 +38,16 @@ public class SamzaSqlRelMsgMetadata implements Serializable { public boolean isNewInputMessage = true; /** - * + * Indicates whether the SamzaSqlMessage is a system message or not. */ - public String operatorBeginProcessingInstant = null; + @JsonIgnore + private boolean isSystemMessage = false; + + /** + * Time at which the join operation started for the message. + * If there is no join node in the operator graph, this will be -1. + */ + public long joinStartTimeMs = -1; /** @@ -93,7 +102,6 @@ public class SamzaSqlRelMsgMetadata implements Serializable { public boolean hasArrivalTime() { return arrivalTime != null && !arrivalTime.isEmpty(); } - @JsonProperty("scanTime") public String getscanTime() { return scanTime;} @@ -103,6 +111,16 @@ public class SamzaSqlRelMsgMetadata implements Serializable { public boolean hasScanTime() { return scanTime != null && !scanTime.isEmpty(); } + @JsonIgnore + public void setIsSystemMessage(boolean isSystemMessage) { + this.isSystemMessage = isSystemMessage; + } + + @JsonIgnore + public boolean isSystemMessage() { + return isSystemMessage; + } + @Override public String toString() { return "[Metadata:{" + eventTime + " " + arrivalTime + " " + scanTime + "}]"; diff --git a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java index 4883dfb..d521681 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java @@ -89,6 +89,7 @@ public class SamzaSqlApplicationConfig { public static final String CFG_METADATA_TOPIC_PREFIX = "samza.sql.metadataTopicPrefix"; public static final String CFG_GROUPBY_WINDOW_DURATION_MS = "samza.sql.groupby.window.ms"; + public static final String CFG_SQL_PROCESS_SYSTEM_EVENTS = "samza.sql.processSystemEvents"; public static final String SAMZA_SYSTEM_LOG = "log"; @@ -115,6 +116,7 @@ public class SamzaSqlApplicationConfig { private final String metadataTopicPrefix; private final long windowDurationMs; + private final boolean processSystemEvents; public SamzaSqlApplicationConfig(Config staticConfig, List<String> inputSystemStreams, List<String> outputSystemStreams) { @@ -165,6 +167,8 @@ public class SamzaSqlApplicationConfig { metadataTopicPrefix = staticConfig.get(CFG_METADATA_TOPIC_PREFIX, DEFAULT_METADATA_TOPIC_PREFIX); + + processSystemEvents = staticConfig.getBoolean(CFG_SQL_PROCESS_SYSTEM_EVENTS, true); windowDurationMs = staticConfig.getLong(CFG_GROUPBY_WINDOW_DURATION_MS, DEFAULT_GROUPBY_WINDOW_DURATION_MS); } @@ -324,4 +328,8 @@ public class SamzaSqlApplicationConfig { public long getWindowDurationMs() { return windowDurationMs; } + + public boolean isProcessSystemEvents() { + return processSystemEvents; + } } diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java index 3378788..6e6ff45 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java @@ -193,5 +193,4 @@ class ProjectTranslator { context.registerMessageStream(project.getId(), outputStream); context.registerRelNode(project.getId(), project); } - } diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java index d3c8fa9..ce4737c 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java @@ -50,6 +50,7 @@ import org.apache.samza.metrics.SamzaHistogram; import org.apache.samza.operators.KV; import org.apache.samza.operators.MessageStream; import org.apache.samza.operators.OutputStream; +import org.apache.samza.operators.functions.FilterFunction; import org.apache.samza.operators.functions.MapFunction; import org.apache.samza.serializers.KVSerde; import org.apache.samza.serializers.NoOpSerde; @@ -293,6 +294,17 @@ public class QueryTranslator { GenericOutputDescriptor<KV<Object, Object>> osd = sd.getOutputDescriptor(sinkConfig.getStreamId(), noOpKVSerde); OutputStream stm = outputMsgStreams.computeIfAbsent(sinkConfig.getSource(), v -> appDesc.getOutputStream(osd)); outputStream.sendTo(stm); + + // Process system events only if the output is a stream. + if (sqlConfig.isProcessSystemEvents()) { + for( MessageStream<SamzaSqlInputMessage> inputStream : inputMsgStreams.values()) { + MessageStream<KV<Object, Object>> systemEventStream = + inputStream.filter(message -> message.getMetadata().isSystemMessage()) + .map(SamzaSqlInputMessage::getKeyAndMessageKV); + + systemEventStream.sendTo(stm); + } + } } else { Table outputTable = appDesc.getTable(tableDescriptor.get()); if (outputTable == null) { diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java index e044f6f..5fa04d8 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java @@ -1,21 +1,21 @@ /* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -*/ + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ package org.apache.samza.sql.translator; @@ -36,8 +36,8 @@ import org.apache.samza.operators.MessageStream; import org.apache.samza.operators.functions.FilterFunction; import org.apache.samza.operators.functions.MapFunction; import org.apache.samza.serializers.NoOpSerde; -import org.apache.samza.sql.SamzaSqlInputTransformer; import org.apache.samza.sql.SamzaSqlInputMessage; +import org.apache.samza.sql.SamzaSqlInputTransformer; import org.apache.samza.sql.data.SamzaSqlRelMessage; import org.apache.samza.sql.interfaces.SamzaRelConverter; import org.apache.samza.sql.interfaces.SqlIOConfig; @@ -48,6 +48,7 @@ import org.apache.samza.system.descriptors.InputTransformer; import org.apache.samza.table.descriptors.CachingTableDescriptor; import org.apache.samza.table.descriptors.RemoteTableDescriptor; + /** * Translator to translate the TableScans in relational graph to the corresponding input streams in the StreamGraph * implementation @@ -78,7 +79,7 @@ class ScanTranslator { @Override public boolean apply(SamzaSqlInputMessage samzaSqlInputMessage) { - return !relConverter.isSystemMessage(samzaSqlInputMessage.getKeyAndMessageKV()); + return !samzaSqlInputMessage.getMetadata().isSystemMessage(); } } @@ -147,11 +148,11 @@ class ScanTranslator { queryInputEvents.inc(); processingTime.update(Duration.between(startProcessing, endProcessing).toMillis()); } - } // ScanMapFunction - void translate(final TableScan tableScan, final String queryLogicalId, final String logicalOpId, final TranslatorContext context, - Map<String, DelegatingSystemDescriptor> systemDescriptors, Map<String, MessageStream<SamzaSqlInputMessage>> inputMsgStreams) { + void translate(final TableScan tableScan, final String queryLogicalId, final String logicalOpId, + final TranslatorContext context, Map<String, DelegatingSystemDescriptor> systemDescriptors, + Map<String, MessageStream<SamzaSqlInputMessage>> inputMsgStreams) { StreamApplicationDescriptor streamAppDesc = context.getStreamAppDescriptor(); List<String> tableNameParts = tableScan.getTable().getQualifiedName(); String sourceName = SqlIOConfig.getSourceFromSourceParts(tableNameParts); @@ -162,9 +163,9 @@ class ScanTranslator { final String streamId = sqlIOConfig.getStreamId(); final String source = sqlIOConfig.getSource(); - final boolean isRemoteTable = sqlIOConfig.getTableDescriptor().isPresent() && - (sqlIOConfig.getTableDescriptor().get() instanceof RemoteTableDescriptor || - sqlIOConfig.getTableDescriptor().get() instanceof CachingTableDescriptor); + final boolean isRemoteTable = sqlIOConfig.getTableDescriptor().isPresent() && ( + sqlIOConfig.getTableDescriptor().get() instanceof RemoteTableDescriptor || sqlIOConfig.getTableDescriptor() + .get() instanceof CachingTableDescriptor); // For remote table, we don't have an input stream descriptor. The table descriptor is already defined by the // SqlIOResolverFactory. @@ -181,22 +182,55 @@ class ScanTranslator { systemDescriptors.put(systemName, systemDescriptor); } else { /* in SamzaSQL, there should be no systemDescriptor setup by user, so this branch happens only - * in case of Fan-OUT (i.e., same input stream used in multiple sql statements), or when same input - * used twice in same sql statement (e.g., select ... from input as i1, input as i2 ...), o.w., throw error */ + * in case of Fan-OUT (i.e., same input stream used in multiple sql statements), or when same input + * used twice in same sql statement (e.g., select ... from input as i1, input as i2 ...), o.w., throw error */ if (systemDescriptor.getTransformer().isPresent()) { InputTransformer existingTransformer = systemDescriptor.getTransformer().get(); if (!(existingTransformer instanceof SamzaSqlInputTransformer)) { - throw new SamzaException("SamzaSQL Exception: existing transformer for " + systemName + " is not SamzaSqlInputTransformer"); + throw new SamzaException( + "SamzaSQL Exception: existing transformer for " + systemName + " is not SamzaSqlInputTransformer"); } } } InputDescriptor inputDescriptor = systemDescriptor.getInputDescriptor(streamId, new NoOpSerde<>()); - MessageStream<SamzaSqlRelMessage> samzaSqlRelMessageStream = - inputMsgStreams.computeIfAbsent(source, v -> streamAppDesc.getInputStream(inputDescriptor)) - .filter(new FilterSystemMessageFunction(sourceName, queryId)) - .map(new ScanMapFunction(sourceName, queryId, queryLogicalId, logicalOpId)); + + if (!inputMsgStreams.containsKey(source)) { + MessageStream<SamzaSqlInputMessage> inputMsgStream = streamAppDesc.getInputStream(inputDescriptor); + inputMsgStreams.put(source, inputMsgStream.map(new SystemMessageMapperFunction(source, queryId))); + } + MessageStream<SamzaSqlRelMessage> samzaSqlRelMessageStream = inputMsgStreams.get(source) + .filter(new FilterSystemMessageFunction(sourceName, queryId)) + .map(new ScanMapFunction(sourceName, queryId, queryLogicalId, logicalOpId)); context.registerMessageStream(tableScan.getId(), samzaSqlRelMessageStream); } + + /** + * Function that populates whether the message is a system message. + * TODO This should ideally be populated by the InputTransformer in future. + */ + private static class SystemMessageMapperFunction implements MapFunction<SamzaSqlInputMessage, SamzaSqlInputMessage> { + private final String source; + private final int queryId; + private transient SamzaRelConverter relConverter; + + public SystemMessageMapperFunction(String source, int queryId) { + this.source = source; + this.queryId = queryId; + } + + @Override + public void init(Context context) { + TranslatorContext translatorContext = + ((SamzaSqlApplicationContext) context.getApplicationTaskContext()).getTranslatorContexts().get(queryId); + relConverter = translatorContext.getMsgConverter(source); + } + + @Override + public SamzaSqlInputMessage apply(SamzaSqlInputMessage message) { + message.getMetadata().setIsSystemMessage(relConverter.isSystemMessage(message.getKeyAndMessageKV())); + return message; + } + } } diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorInputMetricsMapFunction.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorInputMetricsMapFunction.java index bb3300a..ef3028e 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorInputMetricsMapFunction.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorInputMetricsMapFunction.java @@ -19,7 +19,6 @@ package org.apache.samza.sql.translator; -import com.google.common.annotations.VisibleForTesting; import java.time.Instant; import org.apache.samza.context.ContainerContext; import org.apache.samza.context.Context; @@ -60,7 +59,7 @@ class TranslatorInputMetricsMapFunction implements MapFunction<SamzaSqlRelMessag @Override public SamzaSqlRelMessage apply(SamzaSqlRelMessage message) { inputEvents.inc(); - message.getSamzaSqlRelMsgMetadata().operatorBeginProcessingInstant = Instant.now().toString(); + message.getSamzaSqlRelMsgMetadata().joinStartTimeMs = Instant.now().toEpochMilli(); return message; } diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorOutputMetricsMapFunction.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorOutputMetricsMapFunction.java index f1757fb..3e85bed 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorOutputMetricsMapFunction.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorOutputMetricsMapFunction.java @@ -63,7 +63,7 @@ class TranslatorOutputMetricsMapFunction implements MapFunction<SamzaSqlRelMessa @Override public SamzaSqlRelMessage apply(SamzaSqlRelMessage message) { Instant endProcessing = Instant.now(); - Instant beginProcessing = Instant.parse(message.getSamzaSqlRelMsgMetadata().operatorBeginProcessingInstant); + Instant beginProcessing = Instant.ofEpochMilli(message.getSamzaSqlRelMsgMetadata().joinStartTimeMs); outputEvents.inc(); processingTime.update(Duration.between(beginProcessing, endProcessing).toMillis()); return message; diff --git a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java index e69ae9a..e0264ee 100644 --- a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java +++ b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java @@ -107,6 +107,29 @@ public class TestSamzaSqlEndToEnd extends SamzaSqlIntegrationTestHarness { .map(x -> Integer.valueOf(((GenericRecord) x.getMessage()).get("id").toString())) .sorted() .collect(Collectors.toList()); + Assert.assertEquals(numMessages, outMessages.size()); + } + + @Test + public void testEndToEndDisableSystemMessages() { + int numMessages = 20; + + TestAvroSystemFactory.messages.clear(); + Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages); + String avroSamzaToRelMsgConverterDomain = + String.format(SamzaSqlApplicationConfig.CFG_FMT_SAMZA_REL_CONVERTER_DOMAIN, "avro"); + staticConfigs.put(avroSamzaToRelMsgConverterDomain + SamzaSqlApplicationConfig.CFG_FACTORY, + SampleRelConverterFactory.class.getName()); + String sql = "Insert into testavro.simpleOutputTopic select * from testavro.SIMPLE1"; + List<String> sqlStmts = Arrays.asList(sql); + staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); + staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_PROCESS_SYSTEM_EVENTS, "false"); + runApplication(new MapConfig(staticConfigs)); + + List<Integer> outMessages = TestAvroSystemFactory.messages.stream() + .map(x -> Integer.valueOf(((GenericRecord) x.getMessage()).get("id").toString())) + .sorted() + .collect(Collectors.toList()); Assert.assertEquals((numMessages + 1) / 2, outMessages.size()); } @@ -174,7 +197,7 @@ public class TestSamzaSqlEndToEnd extends SamzaSqlIntegrationTestHarness { Assert.assertEquals(numMessages, outMessagesSet.size()); Assert.assertTrue(IntStream.range(0, numMessages).boxed().collect(Collectors.toList()).equals(new ArrayList<>(outMessagesSet))); } - + @Test public void testEndToEndMultiSqlStmtsWithSameSystemStreamAsInputAndOutput() { int numMessages = 20;