[
https://issues.apache.org/jira/browse/BEAM-6783?focusedWorklogId=284547&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-284547
]
ASF GitHub Bot logged work on BEAM-6783:
----------------------------------------
Author: ASF GitHub Bot
Created on: 29/Jul/19 21:06
Start Date: 29/Jul/19 21:06
Worklog Time Spent: 10m
Work Description: kanterov commented on pull request #9169: [BEAM-6783]
byte[] breaks in BeamSQL codegen
URL: https://github.com/apache/beam/pull/9169#discussion_r308437578
##########
File path:
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java
##########
@@ -372,60 +375,115 @@ private InputGetterImpl(Expression input, Schema
inputSchema) {
@Override
public Expression field(BlockBuilder list, int index, Type storageType) {
- if (index >= inputSchema.getFieldCount() || index < 0) {
- throw new IllegalArgumentException("Unable to find field #" + index);
+ return value(list, index, storageType, input, inputSchema);
+ }
+
+ private static Expression value(
+ BlockBuilder list, int index, Type storageType, Expression input,
Schema schema) {
+ if (index >= schema.getFieldCount() || index < 0) {
+ throw new IllegalArgumentException("Unable to find value #" + index);
}
- final Expression expression = list.append("current", input);
+ final Expression expression = list.append(list.newName("current"),
input);
if (storageType == Object.class) {
return Expressions.convert_(
Expressions.call(expression, "getValue",
Expressions.constant(index)), Object.class);
}
- FieldType fromType = inputSchema.getField(index).getType();
+ FieldType fromType = schema.getField(index).getType();
String getter;
if (fromType.getTypeName().isLogicalType()) {
- getter =
logicalTypeGetterMap.get(fromType.getLogicalType().getIdentifier());
+ getter =
LOGICAL_TYPE_GETTER_MAP.get(fromType.getLogicalType().getIdentifier());
} else {
- getter = typeGetterMap.get(fromType.getTypeName());
+ getter = TYPE_GETTER_MAP.get(fromType.getTypeName());
}
if (getter == null) {
throw new IllegalArgumentException("Unable to get " +
fromType.getTypeName());
}
- Expression field = Expressions.call(expression, getter,
Expressions.constant(index));
- if (fromType.getTypeName().isLogicalType()) {
- Expression millisField = Expressions.call(field, "getMillis");
- String logicalId = fromType.getLogicalType().getIdentifier();
+
+ Expression value = Expressions.call(expression, getter,
Expressions.constant(index));
+
+ return value(value, fromType);
+ }
+
+ private static Expression value(Expression value, Schema.FieldType type) {
+ if (type.getTypeName().isLogicalType()) {
+ Expression millisField = Expressions.call(value, "getMillis");
+ String logicalId = type.getLogicalType().getIdentifier();
if (logicalId.equals(TimeType.IDENTIFIER)) {
- field = nullOr(field, Expressions.convert_(millisField, int.class));
+ return nullOr(value, Expressions.convert_(millisField, int.class));
} else if (logicalId.equals(DateType.IDENTIFIER)) {
- field =
+ value =
nullOr(
- field,
+ value,
Expressions.convert_(
Expressions.divide(millisField,
Expressions.constant(MILLIS_PER_DAY)),
int.class));
} else if (!logicalId.equals(CharType.IDENTIFIER)) {
throw new IllegalArgumentException(
- "Unknown LogicalType " +
fromType.getLogicalType().getIdentifier());
+ "Unknown LogicalType " + type.getLogicalType().getIdentifier());
}
- } else if (CalciteUtils.isDateTimeType(fromType)) {
- field = nullOr(field, Expressions.call(field, "getMillis"));
- } else if (fromType.getTypeName().isCompositeType()
- || (fromType.getTypeName().isCollectionType()
- &&
fromType.getCollectionElementType().getTypeName().isCompositeType())) {
- field =
- Expressions.condition(
- Expressions.equal(field, Expressions.constant(null)),
- Expressions.constant(null),
- Expressions.call(WrappedList.class, "of", field));
- } else if (fromType.getTypeName() == TypeName.BYTES) {
- field =
- Expressions.condition(
- Expressions.equal(field, Expressions.constant(null)),
- Expressions.constant(null),
- Expressions.new_(ByteString.class, field));
+ } else if (CalciteUtils.isDateTimeType(type)) {
+ return nullOr(value, Expressions.call(value, "getMillis"));
+ } else if (type.getTypeName().isCompositeType()) {
+ return nullOr(value, row(value, type.getRowSchema()));
+ } else if (type.getTypeName().isCollectionType()) {
+ return nullOr(value, list(value, type.getCollectionElementType()));
+ } else if (type.getTypeName() == TypeName.BYTES) {
+ return nullOr(
+ value, Expressions.new_(ByteString.class,
Types.castIfNecessary(byte[].class, value)));
+ }
+
+ return value;
+ }
+
+ private static Expression list(Expression input, FieldType elementType) {
+ ParameterExpression value = Expressions.parameter(Object.class);
+
+ BlockBuilder block = new BlockBuilder();
+ block.add(value(value, elementType));
+
+ return Expressions.new_(
+ WrappedList.class,
+ ImmutableList.of(Types.castIfNecessary(List.class, input)),
+ ImmutableList.<MemberDeclaration>of(
+ Expressions.methodDecl(
+ Modifier.PUBLIC,
+ Object.class,
+ "value",
+ ImmutableList.of(value),
+ block.toBlock())));
+ }
+
+ private static Expression row(Expression input, Schema schema) {
+ ParameterExpression row = Expressions.parameter(Row.class);
+ ParameterExpression index = Expressions.parameter(int.class);
+ BlockBuilder body = new BlockBuilder(/* optimizing= */ false);
Review comment:
Good question. Honestly, I don't know how optimizations work in linq4j, but
I expect we wouldn't need any, and we would rather have predictable code tree
for easier debugging. Enabling and understanding optimizations would be
something to look into later. For instance, now we would generate code like
`(new ByteString((byte[])value).getBytes())` that isn't going to be optimized.
I guess we can fix that by extending code optimization if needed.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 284547)
Time Spent: 4.5h (was: 4h 20m)
> byte[] breaks in BeamSQL codegen
> --------------------------------
>
> Key: BEAM-6783
> URL: https://issues.apache.org/jira/browse/BEAM-6783
> Project: Beam
> Issue Type: Bug
> Components: dsl-sql
> Reporter: Rui Wang
> Assignee: Gleb Kanterov
> Priority: Major
> Fix For: 2.15.0
>
> Time Spent: 4.5h
> Remaining Estimate: 0h
>
> Calcite will call `byte[].toString` because BeamSQL codegen read byte[] from
> Row to calcite (see:
> https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java#L334).
>
--
This message was sent by Atlassian JIRA
(v7.6.14#76016)