Introduction – Apache Avro and Protobuf
After we have learned some details about Protobuf in the second part of the IoT series, we will now give an introduction to Apache Avro and work out how it compares to Protobuf and if it is suitable to be used in IoT devices.
Apache Avro Basics
Avro is a framework for data serialization which has been developed as part of Apache Hadoop.
It is mainly used in big data processing and is also supported by some of the data processing solutions in Microsoft Azure.
Schemas
Avro uses schemas to structure the data. Schemas are usually defined in JSON, but there is also support for an IDL. This post will concentrate on the JSON format.
As an example, we will now recreate the environment sensor messages from the Protobuf post as a JSON schema. Some changes are necessary due to differences between Protobuf and Avro
- Avro does not support unsigned types. The
timestamp
becomes a 64 bit signed integer. - Contrary to Protobuf, where all fields are optional, Avro does not support optional fields. Every field should get a default value to allow removing it when changing the schema.
- Avro does not have Protobuf’s oneof feature. As a replacement,
TelemetryMessage
gets a field named payload whose type is an union of the two typesEnvironmentData
andEvent
.
{
"namespace": "iotexample",
"name": "DeviceMessages",
"type": "record",
"fields": [
{
"name": "telemetry_messages",
"type": {
"namespace": "iotexample",
"name": "TelemetryMessagesArray",
"type": "array",
"items": {
"namespace": "iotexample",
"name": "TelemetryMessage",
"type": "record",
"fields": [
{
"name": "timestamp",
"type": "long",
"default": 0
},
{
"name": "payload",
"type": [
{
"namespace": "iotexample",
"name": "EnvironmentData",
"type": "record",
"fields": [
{
"name": "temperature",
"type": "double",
"default": 0
},
{
"name": "pressure",
"type": "double",
"default": 0
},
{
"name": "humidity",
"type": "double",
"default": 0
},
{
"name": "co2_level",
"type": "double",
"default": 0
}
]
},
{
"namespace": "iotexample",
"type": "record",
"name": "Event",
"fields": [
{
"type": "string",
"name": "message",
"default": ""
},
{
"type": "int",
"name": "event_number",
"default": 0
},
{
"name": "error_level",
"type": {
"namespace": "iotexample",
"name": "ErrorLevel",
"type": "enum",
"symbols": [
"UNSPECIFIED",
"ERROR",
"WARNING",
"INFO"
]
},
"default": "UNSPECIFIED"
}
]
}
]
}
]
}
}
}
]
}
Schema Evolution
Changing an existing Avro schema (removing fields, adding new fields, etc.) shows the main difference to Protobuf.
We have learned that Protobuf messages can be evolved in a way that enables a consumer which only knows about the new version to consume messages created with the old version and vice versa.
In Avro, this is not possible as the consumer must always know the schema that was used to serialize the message. There are different levels of compatibility which allow different changes and are explained here.
Systems using Avro usually employ a schema registry where all versions of a schema are stored. Messages must then be prefixed with the identifier of the schema used by the producer to allow the consumer to decode the message.
Programming with Avro in C++
To demonstrate the C++ API, we again create a Qt based application. This application uses the Avro C++ library which must be built and installed. The C++ version also provides a code generator namedavrogencpp
. We use it to create the message structs and the serialization and deserialization code. avrogencpp -i iotmessages.json -o iotmessages.h -n iotexample
Walkthrough
First, we need some includes for Avro and the Qt features we are going to use#include "iotmessages.h"
#include <avro/ValidSchema.hh>
#include <avro/Compiler.hh>
#include <QCoreApplication>
#include <QDateTime>
#include <QDebug>
#include <QFile>
Then we use the generated structs from iotmessages.h
and populate a DeviceMessages
message.
int main()
{
// Create and populate a TelemetryMessage with EnvironmentData payload
iotexample::TelemetryMessage sensorDataMessage;
sensorDataMessage.timestamp = QDateTime::currentMSecsSinceEpoch();
iotexample::EnvironmentData payload;
payload.temperature = 23;
payload.pressure = 1080;
payload.humidity = 75;
payload.co2_level = 415;
sensorDataMessage.payload.set_EnvironmentData(payload);
// Create and populate a TelemetryMessage with Event payload
iotexample::TelemetryMessage eventMessage;
eventMessage.timestamp = QDateTime::currentMSecsSinceEpoch();
iotexample::Event event;
event.message = std::string("My event message");
event.error_level = iotexample::ErrorLevel::ERROR;
event.event_number = 123;
eventMessage.payload.set_Event(event);
// Create a DeviceMessages message and add the two messages
iotexample::DeviceMessages messages;
messages.telemetry_messages.push_back(sensorDataMessage);
messages.telemetry_messages.push_back(eventMessage);
In the next step, we will serialize the message and print the serialized data size and the base64
encoded serialized data.
// Create output stream and encoder
auto out = avro::memoryOutputStream();
auto encoder = avro::binaryEncoder();
encoder->init(*out);
// Encode the message
avro::encode(*encoder, messages);
encoder->flush();
qDebug() << "Serialized size:" << out->byteCount() << "bytes";
// Create input stream and reader
auto in = avro::memoryInputStream(*out);
auto reader = avro::StreamReader(*in);
QByteArray data(out->byteCount(), Qt::Uninitialized);
reader.readBytes(reinterpret_cast<quint8 *>(data.data()), out->byteCount());
qDebug() << "Serialized data:" << data.toBase64();
To demonstrate the decoder, we will now use a validatingDecoder
to deserialize the message. This decoder is initialized with the schema and will throw an exception if the data doesn’t match.
After decoding the message, the content is printed. The printing code also shows how to handle the union we used as oneof
replacement.
// Load schema file from the resource
QFile schemaFile(":iotmessages.json");
schemaFile.open(QFile::ReadOnly);
auto schemaInput = std::istringstream(schemaFile.readAll().toStdString());
avro::ValidSchema schema;
try {
avro::compileJsonSchema(schemaInput, schema);
} catch(std::exception &ex) {
qWarning() << "Failed to compile schema:" << ex.what();
return 1;
}
// Create input stream and a validating binary decoder
in = avro::memoryInputStream(*out);
auto decoder = avro::binaryDecoder();
decoder->init(*in);
auto validatingDecoder = avro::validatingDecoder(schema, decoder);
iotexample::DeviceMessages decoded;
try {
avro::decode(*validatingDecoder, decoded);
} catch(std::exception &ex) {
qWarning() << "Decode failed with:" << ex.what();
return 1;
}
qDebug() << "Decoded message with" << decoded.telemetry_messages.size() << "values";
for (uint i = 0; i < decoded.telemetry_messages.size(); ++i) {
const auto ¤t = decoded.telemetry_messages.at(i);
qDebug() << "Message" << i + 1;
qDebug() << " Timestamp:" << QDateTime::fromMSecsSinceEpoch(current.timestamp).toString(Qt::ISODate);
if (current.payload.idx() == 1) {
qDebug() << " Event number:" << current.payload.get_Event().event_number;
qDebug() << " Event error level:" << (int) current.payload.get_Event().error_level;
qDebug() << " Event message:" << QString::fromStdString(current.payload.get_Event().message);
} else if (current.payload.idx() == 0) {
qDebug() << " Temperature:" << current.payload.get_EnvironmentData().temperature;
qDebug() << " Pressure:" << current.payload.get_EnvironmentData().pressure;
qDebug() << " Humidity:" << current.payload.get_EnvironmentData().humidity;
qDebug() << " CO2:" << current.payload.get_EnvironmentData().co2_level;
} else {
qDebug() << " Empty TelemetryMessages";
}
}
return 0;
}
Building the example
We use qmake to build the example application. The JSON schema is included as a resource and is also used to generate the C++ code at build time.
QT -= gui
CONFIG += c++11 console
CONFIG -= app_bundle
INCLUDEPATH += /opt/avro/include
LIBS += -L/opt/avro/lib -lavrocpp
HEADERS += \
iotmessages.h
SOURCES += \
main.cpp
AVRO_COMPILER = /opt/avro/bin/avrogencpp
command = $$AVRO_COMPILER -i iotmessages.json -o iotmessages.h -n iotexample
!system($$command) {
error("avrogencpp is required to build this application")
}
RESOURCES += \
schema.qrc
Result
The application is executed and prints the result.
We see that the serialized data is 68 bytes in size, which is less than the 80 bytes needed by Protobuf. The main cause is the lack of optional fields and field numbers in Avro messages.
Serialized size: 68 bytes
Serialized data: "BLCsqYnDXAAAAAAAAAA3QAAAAAAA4JBAAAAAAADAUkAAAAAAAPB5QLCsqYnDXAIgTXkgZXZlbnQgbWVzc2FnZfYBAgA="
Decoded message with 2 values
Message 1
Timestamp: "2020-05-15T15:45:28"
Temperature: 23
Pressure: 1080
Humidity: 75
CO2: 415
Message 2
Timestamp: "2020-05-15T15:45:28"
Event number: 123
Event error level: 1
Event message: "My event message"
Programming with Avro in JavaScript / Node.js
We will now do the same encode/decode routine in Node.js. This example requires the avro-js module.const avro = require('avro-js');
// Parse the schema and create the corresponding type
const deviceRecordMessage = avro.parse('./iotmessages.json');
// Create a sample message
const messageToEncode = {
telemetry_messages: [
{
timestamp: Date.now(),
payload: {
'iotexample.EnvironmentData': {
temperature: 23,
pressure: 1080,
humidity: 75,
co2_level: 415
}
}
},
{
timestamp: Date.now(),
payload: {
'iotexample.Event': {
message: 'My event message',
event_number: 123,
error_level: 'ERROR'
}
}
}
]
}
// Encode the message
const data = deviceRecordMessage.toBuffer(messageToEncode);
// Print encoded data as base64
console.log(`Encoded data: "${data.toString('base64')}", length: ${data.length}`);
// Decode the message
const decoded = deviceRecordMessage.fromBuffer(data);
// Print the content of the decoded message
console.log(JSON.stringify(decoded, null, 2));
Running the example of course shows the same encoded data as the C++ example. The decoded data is JSON and follows the form that can be expected from the schema.
Encoded data: "BOKUk+vEXAAAAAAAAAA3QAAAAAAA4JBAAAAAAADAUkAAAAAAAPB5QOKUk+vEXAIgTXkgZXZlbnQgbWVzc2FnZfYBAgA=", length: 68
{
"telemetry_messages": [
{
"timestamp": 1589787125041,
"payload": {
"iotexample.EnvironmentData": {
"temperature": 23,
"pressure": 1080,
"humidity": 75,
"co2_level": 415
}
}
},
{
"timestamp": 1589787125041,
"payload": {
"iotexample.Event": {
"message": "My event message",
"event_number": 123,
"error_level": "ERROR"
}
}
}
]
}
Schema evolution in JavaScript
We will now have at look what happens if the consumer in the cloud uses an updated schema and receives a message encoded with an older schema.const avro = require('avro-js');
const fs = require('fs')
const schema = JSON.parse(fs.readFileSync('./iotmessages.json'))
// Parse the schema and create the corresponding type
const deviceRecordMessage = avro.parse(schema);
// Create a sample message
const messageToEncode = {
telemetry_messages: [
{
timestamp: Date.now(),
payload: {
'iotexample.EnvironmentData': {
temperature: 23,
pressure: 1080,
humidity: 75,
co2_level: 415
}
}
},
{
timestamp: Date.now(),
payload: {
'iotexample.Event': {
message: 'My event message',
event_number: 123,
error_level: 'ERROR'
}
}
}
]
}
// Encode the message
const data = deviceRecordMessage.toBuffer(messageToEncode);
// Print encoded data as base64
console.log(`Encoded data: "${data.toString('base64')}", length: ${data.length}`);
The first change will be to remove the co2_level
field from the schema and then attempting to decode the encoded data with the new schema.
let decoded = null
// Modify the schema, rename co2_level to ambient_light
let modifiedSchema = JSON.parse(JSON.stringify(schema));
modifiedSchema.fields[0].type.items.fields[1].type[0].fields[3].name = 'ambient_light';
let deviceRecordMessageEvolved = avro.parse(modifiedSchema);
decoded = deviceRecordMessageEvolved.fromBuffer(data);
console.log(`${decoded.telemetry_messages[0].payload['iotexample.EnvironmentData'].ambient_light} was the value of co2_level`)
Running the code shows that the value that was set in the co2_level
field is now decoded as ambient_light
because the semantic change is just in the schema and the encoded data has not changed..
The second modified schema adds the ambient_light
field in addition to the co2_level
field.
console.log("######## Try to decode a message with a newer type with a removed field ########");
decoded = deviceRecordMessageEvolved.fromBuffer(data);
console.log(`${decoded.telemetry_messages[0].payload['iotexample.EnvironmentData'].ambient_light} was the value of co2_level`)
// Modify the schema, add field ambient_light in addition to co2_level
modifiedSchema = JSON.parse(JSON.stringify(schema));
modifiedSchema.fields[0].type.items.fields[1].type[0].fields.push(
{
name: 'ambient_light',
type: 'double',
defaul": 0
}
);
deviceRecordMessageEvolved = avro.parse(modifiedSchema);
try {
decoded = deviceRecordMessageEvolved.fromBuffer(data);
} catch(ex) {
console.error(`Failed to decode the message encoded with the old schema: ${ex}`);
}
The decoder now fails with Error: invalid union index: -61
because the offsets expected by the decode assume an additional double value which is not present in the encoded data.
We now decode the message using a resolver which maps the encoded data to the new schema with the additional field.
// Create a resolving decoder and decode the message
resolver = deviceRecordMessageEvolved.createResolver(deviceRecordMessage);
decoded = deviceRecordMessageEvolved.fromBuffer(data, resolver);
console.log(JSON.stringify(decoded, null, 2));
The data is now decoded as expected, the ambient_light
field which is not present in the encoded data is set to the default value that was specified in the schema.
{
"telemetry_messages": [
{
"timestamp": 1589784854837,
"payload": {
"iotexample.EnvironmentData": {
"temperature": 23,
"pressure": 1080,
"humidity": 75,
"co2_level": 415,
"ambient_light": 0
}
}
},
{
"timestamp": 1589784854837,
"payload": {
"iotexample.Event": {
"message": "My event message",
"event_number": 123,
"error_level": "ERROR"
}
}
}
]
}
ambient_light
field without specifying a default value and repeat the previous test modifiedSchema = JSON.parse(JSON.stringify(schema));
modifiedSchema.fields[0].type.items.fields[1].type[0].fields.push(
{
name: 'ambient_light',
type: 'double'
}
);
deviceRecordMessageEvolved = avro.parse(modifiedSchema);
try {
resolver = deviceRecordMessageEvolved.createResolver(deviceRecordMessage);
} catch (ex) {
console.error(`Decode failed without default value: ${ex}`);
}
The attempt to create a resolver fails due to the missing default value
Decode failed without default value: Error: cannot read "iotexample.EnvironmentData" as ["iotexample.EnvironmentData","iotexample.Event"]
Conclusion
As we have seen from the example, an Avro message must always be prefixed with some information about which schema was used to encode it or the decoder will either fail or create invalid data. Adding default values to the schema is very important to allow a value to be removed later.
Despite the slightly smaller encoded data size for Avro, the ability to update Protobuf message definitions in a compatible way without having to prefix the encoded data with a schema identifier makes it a better choice for the data transmission of IoT devices.
2 Responses
In AVRO, you can define a field type as a union of [required_type,null]. and the default will be null. I do not think the point of having to define a default to enable removing later is correct.
Thank you for the hint. According to https://issues.apache.org/jira/browse/AVRO-1803 the field must also have a default value (in this case null). Is this no longer the case?