This is an automated email from the ASF dual-hosted git repository.
zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new c4553ddc46 [INLONG-9202][Sort] Fix audit report error when running
pulsar -> iceberg in flink1.15 (#9206)
c4553ddc46 is described below
commit c4553ddc46f455e8efa8a3b286cf2783dfcc58bd
Author: Sting <[email protected]>
AuthorDate: Fri Nov 3 15:21:01 2023 +0800
[INLONG-9202][Sort] Fix audit report error when running pulsar -> iceberg
in flink1.15 (#9206)
---
.../org/apache/inlong/audit/AuditOperator.java | 4 ++-
.../protocol/node/extract/PulsarExtractNode.java | 31 +++++++++++++++++++++-
.../sort/protocol/node/load/IcebergLoadNode.java | 16 ++++++++++-
inlong-sort/sort-core/pom.xml | 6 +++++
.../org/apache/inlong/sort/base/Constants.java | 8 ++++++
.../sort-connectors/pulsar/pom.xml | 2 --
.../table/PulsarTableDeserializationSchema.java | 10 +++++--
.../PulsarTableDeserializationSchemaFactory.java | 6 +----
.../formats/inlongmsg/InLongMsgDecodingFormat.java | 13 +++++++++
.../sort/formats/inlongmsg/InLongMsgUtils.java | 3 +++
10 files changed, 87 insertions(+), 12 deletions(-)
diff --git
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditOperator.java
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditOperator.java
index a89c30f6c0..e720a75cb4 100644
---
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditOperator.java
+++
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditOperator.java
@@ -26,6 +26,7 @@ import org.apache.inlong.audit.util.StatInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.Serializable;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.HashMap;
@@ -42,8 +43,9 @@ import static
org.apache.inlong.audit.protocol.AuditApi.BaseCommand.Type.AUDIT_R
/**
* Audit operator, which is singleton.
*/
-public class AuditOperator {
+public class AuditOperator implements Serializable {
+ private static final long serialVersionUID = 1L;
private static final Logger LOGGER =
LoggerFactory.getLogger(AuditOperator.class);
private static final String FIELD_SEPARATORS = ":";
private static final String DEFAULT_AUDIT_TAG = "-1";
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNode.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNode.java
index cba35f8683..d6634013da 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNode.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNode.java
@@ -17,8 +17,10 @@
package org.apache.inlong.sort.protocol.node.extract;
+import org.apache.inlong.common.enums.MetaField;
import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.InlongMetric;
+import org.apache.inlong.sort.protocol.Metadata;
import org.apache.inlong.sort.protocol.node.ExtractNode;
import org.apache.inlong.sort.protocol.node.format.Format;
import org.apache.inlong.sort.protocol.transformation.WatermarkField;
@@ -34,13 +36,15 @@ import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTyp
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
+import java.util.EnumSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
@EqualsAndHashCode(callSuper = true)
@JsonTypeName("pulsarExtract")
@Data
-public class PulsarExtractNode extends ExtractNode implements InlongMetric {
+public class PulsarExtractNode extends ExtractNode implements InlongMetric,
Metadata {
private static final long serialVersionUID = 1L;
@@ -136,4 +140,29 @@ public class PulsarExtractNode extends ExtractNode
implements InlongMetric {
public List<FieldInfo> getPartitionFields() {
return super.getPartitionFields();
}
+
+ @Override
+ public String getMetadataKey(MetaField metaField) {
+ String metadataKey;
+ switch (metaField) {
+ case AUDIT_DATA_TIME:
+ metadataKey = "value.data-time";
+ break;
+ default:
+ throw new
UnsupportedOperationException(String.format("Unsupport meta field for %s: %s",
+ this.getClass().getSimpleName(), metaField));
+ }
+ return metadataKey;
+ }
+
+ @Override
+ public boolean isVirtual(MetaField metaField) {
+ return true;
+ }
+
+ @Override
+ public Set<MetaField> supportedMetaFields() {
+ return EnumSet.of(MetaField.AUDIT_DATA_TIME);
+ }
+
}
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNode.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNode.java
index d39114c3b7..6ffaf5f2da 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNode.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNode.java
@@ -132,9 +132,23 @@ public class IcebergLoadNode extends LoadNode implements
InlongMetric, Metadata,
return super.getPartitionFields();
}
+ @Override
+ public String getMetadataKey(MetaField metaField) {
+ String metadataKey;
+ switch (metaField) {
+ case AUDIT_DATA_TIME:
+ metadataKey = "audit_data_time";
+ break;
+ default:
+ throw new
UnsupportedOperationException(String.format("Unsupport meta field for %s: %s",
+ this.getClass().getSimpleName(), metaField));
+ }
+ return metadataKey;
+ }
+
@Override
public boolean isVirtual(MetaField metaField) {
- return true;
+ return false;
}
@Override
diff --git a/inlong-sort/sort-core/pom.xml b/inlong-sort/sort-core/pom.xml
index 7228bfb59c..998905c038 100644
--- a/inlong-sort/sort-core/pom.xml
+++ b/inlong-sort/sort-core/pom.xml
@@ -293,6 +293,12 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>sort-connector-iceberg-v1.15</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
<plugins>
diff --git
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java
index 5065c78c1f..ac7baac4dd 100644
---
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java
+++
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java
@@ -421,4 +421,12 @@ public final class Constants {
.booleanType()
.defaultValue(false)
.withDescription("Whether supporting auto create table
when snapshot, default value is 'false'");
+
+ public static final ConfigOption<String> INNER_FORMAT =
+ ConfigOptions.key("inlong-msg.inner.format")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Inner format");
+
}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/pom.xml
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/pom.xml
index d2b24382a6..7f4afdd848 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/pom.xml
+++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/pom.xml
@@ -89,8 +89,6 @@
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
- <scope>provided</scope>
- <optional>true</optional>
</dependency>
<!-- Pulsar Client -->
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchema.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchema.java
index 237de45b4d..4792d7b287 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchema.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchema.java
@@ -17,6 +17,7 @@
package org.apache.inlong.sort.pulsar.table;
+import org.apache.inlong.sort.base.metric.MetricOption;
import org.apache.inlong.sort.base.metric.MetricsCollector;
import org.apache.inlong.sort.base.metric.SourceMetricData;
@@ -58,13 +59,15 @@ public class PulsarTableDeserializationSchema implements
PulsarDeserializationSc
private SourceMetricData sourceMetricData;
+ private MetricOption metricOption;
+
public PulsarTableDeserializationSchema(
@Nullable DeserializationSchema<RowData> keyDeserialization,
DeserializationSchema<RowData> valueDeserialization,
TypeInformation<RowData> producedTypeInfo,
PulsarRowDataConverter rowDataConverter,
boolean upsertMode,
- SourceMetricData sourceMetricData) {
+ MetricOption metricOption) {
if (upsertMode) {
checkNotNull(keyDeserialization, "upsert mode must specify a key
format");
}
@@ -73,7 +76,7 @@ public class PulsarTableDeserializationSchema implements
PulsarDeserializationSc
this.rowDataConverter = checkNotNull(rowDataConverter);
this.producedTypeInfo = checkNotNull(producedTypeInfo);
this.upsertMode = upsertMode;
- this.sourceMetricData = sourceMetricData;
+ this.metricOption = metricOption;
}
@Override
@@ -82,6 +85,9 @@ public class PulsarTableDeserializationSchema implements
PulsarDeserializationSc
if (keyDeserialization != null) {
keyDeserialization.open(context);
}
+ if (metricOption != null) {
+ sourceMetricData = new SourceMetricData(metricOption);
+ }
valueDeserialization.open(context);
}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchemaFactory.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchemaFactory.java
index 360b07aa69..c063e26539 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchemaFactory.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchemaFactory.java
@@ -172,17 +172,13 @@ public class PulsarTableDeserializationSchemaFactory
implements Serializable {
.withAuditKeys(auditKeys)
.build();
- if (metricOption != null) {
- sourceMetricData = new SourceMetricData(metricOption);
- }
-
return new PulsarTableDeserializationSchema(
keyDeserialization,
valueDeserialization,
producedTypeInfo,
rowDataConverter,
upsertMode,
- sourceMetricData);
+ metricOption);
}
public void setProducedDataType(DataType producedDataType) {
diff --git
a/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgDecodingFormat.java
b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgDecodingFormat.java
index 4645290c1b..0f67bbc072 100644
---
a/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgDecodingFormat.java
+++
b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgDecodingFormat.java
@@ -139,6 +139,19 @@ public class InLongMsgDecodingFormat implements
DecodingFormat<DeserializationSc
enum ReadableMetadata {
+ DATA_TIME(
+ "data-time",
+ DataTypes.BIGINT().notNull(),
+ new MetadataConverter() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object read(InLongMsgHead head) {
+ return head.getTime().getTime();
+ }
+ }),
+
CREATE_TIME(
"create-time",
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE().notNull(),
diff --git
a/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgUtils.java
b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgUtils.java
index 16568d00c3..371ce84808 100644
---
a/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgUtils.java
+++
b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgUtils.java
@@ -54,6 +54,7 @@ public class InLongMsgUtils {
// keys in attributes
public static final String INLONGMSG_ATTR_STREAM_ID = "streamId";
+ public static final String INLONGMSG_ATTR_TID = "tid";
public static final String INLONGMSG_ATTR_TIME_T = "t";
public static final String INLONGMSG_ATTR_TIME_DT = "dt";
public static final String INLONGMSG_ATTR_ADD_COLUMN_PREFIX = "__addcol";
@@ -118,6 +119,8 @@ public class InLongMsgUtils {
if (attributes.containsKey(INLONGMSG_ATTR_STREAM_ID)) {
streamId = attributes.get(INLONGMSG_ATTR_STREAM_ID);
+ } else if (attributes.containsKey(INLONGMSG_ATTR_TID)) {
+ streamId = attributes.get(INLONGMSG_ATTR_TID);
} else {
throw new IllegalArgumentException("Could not find " +
INLONGMSG_ATTR_STREAM_ID + " in attributes!");
}