\uD83E\uDD14 Problem
When event.log shows the error message like:
Outgoing publish message was dropped. Receiving consumer: nvqxa4djnzts2mbrpqvs6tlzirqxiyjpem======, topic: 1234-dev/MyData//1.234/STATION123E/NP/9876/ABCD/1146/12/3H, qos: 0, reason: The QoS 0 memory limit exceeded, size: 52,429,963 bytes, max: 52,428,800 bytes.
The message reason: The QoS 0 memory limit exceeded
indicates that QoS messages are being consumed more slowly than produced for some time, the QoS 0 memory limit is exceeded per consumer, hence the QoS 0 message was dropped, as for QoS 0 messages there is no delivery guarantee.
Note that the message contains no Client ID
but the Receiving consumer's ID. This indicates that the slow recipient of the message is not an MQTT Client but a HiveMQ Enterprise Extension Consumer.
The event log message is being logged in the following way:
logMessageDropped.debug(OUTBOUND_PUBLISH_DROPPED_CONSUMER_MARKER, "Outgoing publish message was dropped. Receiving consumer: {}, topic: {}, qos: {}, reason: {}.", valueFromNullable(CONSUMER_ID_KEY, consumerId), valueFromNullable(TOPIC_KEY, topic), value(QOS_KEY, qos), value(REASON_KEY, reason));
So the consumer id is after the words Receiving consumer: {}
and before the comma ,
.
Each Extension uses its own way to generate a CONSUMER_ID_KEY.
This article will explain how to find out, to which HiveMQ Enterprise Extension the consumer belongs and how to identify the corresponding configuration item in the extension’s configuration files.
\uD83C\uDF31 Extension for Google Pub/Sub
Look at the configuration XML of MQTT to Google Cloud Pub/Sub Extension. The mapping looks the following way:
<mqtt-to-pubsub-mappings> <mqtt-to-pubsub-mapping> <id>mapping-01</id> <!-- REQUIRED --> <pubsub-connection>your-custom-connection-id</pubsub-connection> <!-- REQUIRED --> <preserve-message-properties>true</preserve-message-properties> <!-- OPTIONAL, Default = false --> <mqtt-topic-filters> <!-- REQUIRED --> <mqtt-topic-filter>+/MyData/#</mqtt-topic-filter> <!-- REQUIRED, at least one --> <mqtt-topic-filter>+/YourData/#</mqtt-topic-filter> <!-- OPTIONAL --> <mqtt-topic-filter>+/OurData/#</mqtt-topic-filter> <!-- OPTIONAL --> </mqtt-topic-filters> ... ... </mqtt-to-pubsub-mapping> </mqtt-to-pubsub-mappings>
Where a mapping entity
<id>mapping-01</id>
contains multiple topic filters<mqtt-topic-filter>topic/a</mqtt-topic-filter>
.Each unique Consumer Id will consist of a Mapping Entity Id and Topic Filter:
final String consumerId = entityId + "|" + topicFilter;
The consumer id will be encoded to avoid unsupported characters. The encoding method is Base32 encoding which is all in lowercase letters and without any padding characters:
BaseEncoding CONSUMER_ID_ENCODING = BaseEncoding.base32().lowerCase().omitPadding();
Encoding the ConsumerId:
return CONSUMER_ID_ENCODING.encode(consumerId.getBytes(StandardCharsets.UTF_8));
Decode ConsumerId
nvqxa4djnzts2mbrpqvs6tlzirqxiyjpem======
into a human-readable form:import base64 encoded_string = "nvqxa4djnzts2mbrpqvs6tlzirqxiyjpem======" decoded_bytes = base64.b32decode(encoded_string.encode(), casefold=True) decoded_string = decoded_bytes.decode() print(decoded_string) # Output: mapping-01|+/MyData/#
Output
mapping-01|+/MyData/#
means that the consumer corresponds to the Pub/Sub configuration file mapping withid
mapping-01
and topic filter+/MyData/#
.Find the corresponding entity in the Google Cloud Pub/Sub configuration file:
If a Pub/Sub consumer frequently experiences this kind of dropped message: “The QoS 0 memory limit exceeded
“, it can indicate that the Pub/Sub is consuming messages too slowly or has unstable network connections.
\uD83C\uDF31 Bridge Extension
Look at the configuration XML of Bridge Extension.
<bridges> <bridge> <enabled>true</enabled> <name>my-bridge-1</name> ... </bridge> <bridge> <enabled>true</enabled> <name>my-bridge-2</name> ... </bridge> </bridges>
Consumer Id will be the Bridge
name
, i.e.my-bridge-1
.
\uD83C\uDF31 Amazon Kinesis Extension
Amazon Kinesis Extension is using the same approach as Google Pub/Sub Extension:
private static final @NotNull BaseEncoding CONSUMER_ID_ENCODING = BaseEncoding.base32().lowerCase().omitPadding(); ... final String consumerId = routeId + "|" + topicFilter; return CONSUMER_ID_ENCODING.encode(consumerId.getBytes(StandardCharsets.UTF_8));
Look at the extension configuration:
<mqtt-to-kinesis-routes> <mqtt-to-kinesis-route> <id>my-mqtt-to-kinesis-route</id> <enabled>true</enabled> <aws-credential-profile-id>aws-credential-profile-01</aws-credential-profile-id> <region>eu-central-1</region> <mqtt-topic-filters> <mqtt-topic-filter>mqtt/topic/a</mqtt-topic-filter> </mqtt-topic-filters> ...
ConsumerId will be base32 encoded lowercase string
my-mqtt-to-kinesis-route|mqtt/topic/a
→nv4s23lror2c25dpfvvws3tfonuxgllsn52xizl4nvyxi5bporxxa2ldf5qq====
.
Kafka Extension
In the Kafka Extension, a Consumer Id consists of kafka-extension_
prefix concatenated to a mapping id:
String CONSUMER_PREFIX = "kafka-extension_"; ... final String consumerId = CONSUMER_PREFIX + mqttToKafkaMapping.getId();
Look at the kafka-cofiguration.xml:
<mqtt-to-kafka-mappings> <mqtt-to-kafka-mapping> <id>mapping01</id> <cluster-id>cluster01</cluster-id> <mqtt-topic-filters> <mqtt-topic-filter>#</mqtt-topic-filter> </mqtt-topic-filters> <kafka-topic>kafka-topic</kafka-topic> </mqtt-to-kafka-mapping> </mqtt-to-kafka-mappings>
The Consumer id will look like kafka-extension_mapping01
.