This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 82b267b Secrets Frontend (#2853) 82b267b is described below commit 82b267ba9d59fa677d5b4fdcc48e38de6c59dee9 Author: Sanjeev Kulkarni <sanjee...@gmail.com> AuthorDate: Fri Oct 26 14:49:44 2018 -0700 Secrets Frontend (#2853) * Secrets Frontend * Do not expose secrets in cli --- .../pulsar/common/functions/FunctionConfig.java | 5 ++ .../org/apache/pulsar/common/io/SinkConfig.java | 5 ++ .../org/apache/pulsar/common/io/SourceConfig.java | 5 ++ .../instance/src/main/python/Function_pb2.py | 76 ++++++++++++---------- .../proto/src/main/proto/Function.proto | 1 + .../functions/utils/FunctionConfigUtils.java | 10 +++ .../pulsar/functions/utils/SinkConfigUtils.java | 8 +++ .../pulsar/functions/utils/SourceConfigUtils.java | 9 +++ 8 files changed, 85 insertions(+), 34 deletions(-) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java index ea5866a..2163df1 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java @@ -79,6 +79,11 @@ public class FunctionConfig { private ProcessingGuarantees processingGuarantees; private boolean retainOrdering; private Map<String, Object> userConfig; + // This is a map of secretName(aka how the secret is going to be + // accessed in the function via context) to an object that + // encapsulates how the secret is fetched by the underlying + // secrets provider + private Map<String, Object> secrets; private Runtime runtime; private boolean autoAck; private int maxMessageRetries = -1; diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/io/SinkConfig.java b/pulsar-common/src/main/java/org/apache/pulsar/common/io/SinkConfig.java index c4bf1ca..355c696 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/io/SinkConfig.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/io/SinkConfig.java @@ -55,6 +55,11 @@ public class SinkConfig { private Map<String, ConsumerConfig> inputSpecs = new TreeMap<>(); private Map<String, Object> configs; + // This is a map of secretName(aka how the secret is going to be + // accessed in the function via context) to an object that + // encapsulates how the secret is fetched by the underlying + // secrets provider + private Map<String, Object> secrets; private int parallelism = 1; private FunctionConfig.ProcessingGuarantees processingGuarantees; private boolean retainOrdering; diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/io/SourceConfig.java b/pulsar-common/src/main/java/org/apache/pulsar/common/io/SourceConfig.java index 57c0f79..9dbe97c 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/io/SourceConfig.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/io/SourceConfig.java @@ -46,6 +46,11 @@ public class SourceConfig { private String schemaType; private Map<String, Object> configs; + // This is a map of secretName(aka how the secret is going to be + // accessed in the function via context) to an object that + // encapsulates how the secret is fetched by the underlying + // secrets provider + private Map<String, Object> secrets; private int parallelism = 1; private FunctionConfig.ProcessingGuarantees processingGuarantees; private Resources resources; diff --git a/pulsar-functions/instance/src/main/python/Function_pb2.py b/pulsar-functions/instance/src/main/python/Function_pb2.py index b09e105..d6cc8af 100644 --- a/pulsar-functions/instance/src/main/python/Function_pb2.py +++ b/pulsar-functions/instance/src/main/python/Function_pb2.py @@ -18,6 +18,7 @@ # # Generated by the protocol buffer compiler. DO NOT EDIT! +# Generated by the protocol buffer compiler. DO NOT EDIT! # source: Function.proto import sys @@ -39,7 +40,7 @@ DESCRIPTOR = _descriptor.FileDescriptor( name='Function.proto', package='proto', syntax='proto3', - serialized_pb=_b('\n\x0e\x46unction.proto\x12\x05proto\"3\n\tResources\x12\x0b\n\x03\x63pu\x18\x01 \x01(\x01\x12\x0b\n\x03ram\x18\x02 \x01(\x03\x12\x0c\n\x04\x64isk\x18\x03 \x01(\x03\"B\n\x0cRetryDetails\x12\x19\n\x11maxMessageRetries\x18\x01 \x01(\x05\x12\x17\n\x0f\x64\x65\x61\x64LetterTopic\x18\x02 \x01(\t\"\xd4\x03\n\x0f\x46unctionDetails\x12\x0e\n\x06tenant\x18\x01 \x01(\t\x12\x11\n\tnamespace\x18\x02 \x01(\t\x12\x0c\n\x04name\x18\x03 \x01(\t\x12\x11\n\tclassName\x18\x04 \x01(\t\x1 [...] + serialized_pb=_b('\n\x0e\x46unction.proto\x12\x05proto\"3\n\tResources\x12\x0b\n\x03\x63pu\x18\x01 \x01(\x01\x12\x0b\n\x03ram\x18\x02 \x01(\x03\x12\x0c\n\x04\x64isk\x18\x03 \x01(\x03\"B\n\x0cRetryDetails\x12\x19\n\x11maxMessageRetries\x18\x01 \x01(\x05\x12\x17\n\x0f\x64\x65\x61\x64LetterTopic\x18\x02 \x01(\t\"\xe8\x03\n\x0f\x46unctionDetails\x12\x0e\n\x06tenant\x18\x01 \x01(\t\x12\x11\n\tnamespace\x18\x02 \x01(\t\x12\x0c\n\x04name\x18\x03 \x01(\t\x12\x11\n\tclassName\x18\x04 \x01(\t\x1 [...] ) _PROCESSINGGUARANTEES = _descriptor.EnumDescriptor( @@ -63,8 +64,8 @@ _PROCESSINGGUARANTEES = _descriptor.EnumDescriptor( ], containing_type=None, options=None, - serialized_start=1724, - serialized_end=1803, + serialized_start=1744, + serialized_end=1823, ) _sym_db.RegisterEnumDescriptor(_PROCESSINGGUARANTEES) @@ -86,8 +87,8 @@ _SUBSCRIPTIONTYPE = _descriptor.EnumDescriptor( ], containing_type=None, options=None, - serialized_start=1805, - serialized_end=1849, + serialized_start=1825, + serialized_end=1869, ) _sym_db.RegisterEnumDescriptor(_SUBSCRIPTIONTYPE) @@ -116,8 +117,8 @@ _FUNCTIONDETAILS_RUNTIME = _descriptor.EnumDescriptor( ], containing_type=None, options=None, - serialized_start=584, - serialized_end=615, + serialized_start=604, + serialized_end=635, ) _sym_db.RegisterEnumDescriptor(_FUNCTIONDETAILS_RUNTIME) @@ -262,56 +263,63 @@ _FUNCTIONDETAILS = _descriptor.Descriptor( is_extension=False, extension_scope=None, options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( - name='runtime', full_name='proto.FunctionDetails.runtime', index=7, + name='secretsMap', full_name='proto.FunctionDetails.secretsMap', index=7, + number=16, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='runtime', full_name='proto.FunctionDetails.runtime', index=8, number=8, type=14, cpp_type=8, label=1, has_default_value=False, default_value=0, message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( - name='autoAck', full_name='proto.FunctionDetails.autoAck', index=8, + name='autoAck', full_name='proto.FunctionDetails.autoAck', index=9, number=9, type=8, cpp_type=7, label=1, has_default_value=False, default_value=False, message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( - name='parallelism', full_name='proto.FunctionDetails.parallelism', index=9, + name='parallelism', full_name='proto.FunctionDetails.parallelism', index=10, number=10, type=5, cpp_type=1, label=1, has_default_value=False, default_value=0, message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( - name='source', full_name='proto.FunctionDetails.source', index=10, + name='source', full_name='proto.FunctionDetails.source', index=11, number=11, type=11, cpp_type=10, label=1, has_default_value=False, default_value=None, message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( - name='sink', full_name='proto.FunctionDetails.sink', index=11, + name='sink', full_name='proto.FunctionDetails.sink', index=12, number=12, type=11, cpp_type=10, label=1, has_default_value=False, default_value=None, message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( - name='resources', full_name='proto.FunctionDetails.resources', index=12, + name='resources', full_name='proto.FunctionDetails.resources', index=13, number=13, type=11, cpp_type=10, label=1, has_default_value=False, default_value=None, message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( - name='packageUrl', full_name='proto.FunctionDetails.packageUrl', index=13, + name='packageUrl', full_name='proto.FunctionDetails.packageUrl', index=14, number=14, type=9, cpp_type=9, label=1, has_default_value=False, default_value=_b("").decode('utf-8'), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( - name='retryDetails', full_name='proto.FunctionDetails.retryDetails', index=14, + name='retryDetails', full_name='proto.FunctionDetails.retryDetails', index=15, number=15, type=11, cpp_type=10, label=1, has_default_value=False, default_value=None, message_type=None, enum_type=None, containing_type=None, @@ -331,7 +339,7 @@ _FUNCTIONDETAILS = _descriptor.Descriptor( oneofs=[ ], serialized_start=147, - serialized_end=615, + serialized_end=635, ) @@ -375,8 +383,8 @@ _CONSUMERSPEC = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=617, - serialized_end=699, + serialized_start=637, + serialized_end=719, ) @@ -413,8 +421,8 @@ _SOURCESPEC_TOPICSTOSERDECLASSNAMEENTRY = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=1053, - serialized_end=1114, + serialized_start=1073, + serialized_end=1134, ) _SOURCESPEC_INPUTSPECSENTRY = _descriptor.Descriptor( @@ -450,8 +458,8 @@ _SOURCESPEC_INPUTSPECSENTRY = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=1116, - serialized_end=1186, + serialized_start=1136, + serialized_end=1206, ) _SOURCESPEC = _descriptor.Descriptor( @@ -543,8 +551,8 @@ _SOURCESPEC = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=702, - serialized_end=1186, + serialized_start=722, + serialized_end=1206, ) @@ -616,8 +624,8 @@ _SINKSPEC = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=1189, - serialized_end=1334, + serialized_start=1209, + serialized_end=1354, ) @@ -654,8 +662,8 @@ _PACKAGELOCATIONMETADATA = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=1336, - serialized_end=1408, + serialized_start=1356, + serialized_end=1428, ) @@ -706,8 +714,8 @@ _FUNCTIONMETADATA = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=1411, - serialized_end=1572, + serialized_start=1431, + serialized_end=1592, ) @@ -744,8 +752,8 @@ _INSTANCE = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=1574, - serialized_end=1655, + serialized_start=1594, + serialized_end=1675, ) @@ -782,8 +790,8 @@ _ASSIGNMENT = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=1657, - serialized_end=1722, + serialized_start=1677, + serialized_end=1742, ) _FUNCTIONDETAILS.fields_by_name['processingGuarantees'].enum_type = _PROCESSINGGUARANTEES diff --git a/pulsar-functions/proto/src/main/proto/Function.proto b/pulsar-functions/proto/src/main/proto/Function.proto index 482d901..8d93764 100644 --- a/pulsar-functions/proto/src/main/proto/Function.proto +++ b/pulsar-functions/proto/src/main/proto/Function.proto @@ -57,6 +57,7 @@ message FunctionDetails { string logTopic = 5; ProcessingGuarantees processingGuarantees = 6; string userConfig = 7; + string secretsMap = 16; Runtime runtime = 8; bool autoAck = 9; int32 parallelism = 10; diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java index 0586a53..a7e45cc 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java @@ -188,6 +188,10 @@ public class FunctionConfigUtils { functionDetailsBuilder.setUserConfig(new Gson().toJson(configs)); } + if (functionConfig.getSecrets() != null && !functionConfig.getSecrets().isEmpty()) { + functionDetailsBuilder.setSecretsMap(new Gson().toJson(functionConfig.getSecrets())); + } + functionDetailsBuilder.setAutoAck(functionConfig.isAutoAck()); functionDetailsBuilder.setParallelism(functionConfig.getParallelism()); if (functionConfig.getResources() != null) { @@ -275,6 +279,12 @@ public class FunctionConfigUtils { } functionConfig.setUserConfig(userConfig); + if (!isEmpty(functionDetails.getSecretsMap())) { + Type type = new TypeToken<Map<String, Object>>() {}.getType(); + Map<String, Object> secretsMap = new Gson().fromJson(functionDetails.getSecretsMap(), type); + functionConfig.setSecrets(secretsMap); + } + if (functionDetails.hasResources()) { Resources resources = new Resources(); resources.setCpu(functionDetails.getResources().getCpu()); diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java index cea9dc7..4834db0 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java @@ -165,6 +165,9 @@ public class SinkConfigUtils { if (sinkConfig.getConfigs() != null) { sinkSpecBuilder.setConfigs(new Gson().toJson(sinkConfig.getConfigs())); } + if (sinkConfig.getSecrets() != null && !sinkConfig.getSecrets().isEmpty()) { + functionDetailsBuilder.setSecretsMap(new Gson().toJson(sinkConfig.getSecrets())); + } if (typeArg != null) { sinkSpecBuilder.setTypeClassName(typeArg); } @@ -228,6 +231,11 @@ public class SinkConfigUtils { Type type = new TypeToken<Map<String, String>>() {}.getType(); sinkConfig.setConfigs(new Gson().fromJson(functionDetails.getSink().getConfigs(), type)); } + if (!isEmpty(functionDetails.getSecretsMap())) { + Type type = new TypeToken<Map<String, Object>>() {}.getType(); + Map<String, Object> secretsMap = new Gson().fromJson(functionDetails.getSecretsMap(), type); + sinkConfig.setSecrets(secretsMap); + } if (functionDetails.hasResources()) { Resources resources = new Resources(); resources.setCpu(functionDetails.getResources().getCpu()); diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java index 8956ff4..8d7c1d0 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java @@ -98,6 +98,10 @@ public class SourceConfigUtils { sourceSpecBuilder.setConfigs(new Gson().toJson(sourceConfig.getConfigs())); } + if (sourceConfig.getSecrets() != null && !sourceConfig.getSecrets().isEmpty()) { + functionDetailsBuilder.setSecretsMap(new Gson().toJson(sourceConfig.getSecrets())); + } + if (typeArg != null) { sourceSpecBuilder.setTypeClassName(typeArg); } @@ -156,6 +160,11 @@ public class SourceConfigUtils { Type type = new TypeToken<Map<String, String>>() {}.getType(); sourceConfig.setConfigs(new Gson().fromJson(sourceSpec.getConfigs(), type)); } + if (!isEmpty(functionDetails.getSecretsMap())) { + Type type = new TypeToken<Map<String, Object>>() {}.getType(); + Map<String, Object> secretsMap = new Gson().fromJson(functionDetails.getSecretsMap(), type); + sourceConfig.setSecrets(secretsMap); + } Function.SinkSpec sinkSpec = functionDetails.getSink(); sourceConfig.setTopicName(sinkSpec.getTopic()); if (!StringUtils.isEmpty(sinkSpec.getSchemaType())) {