Skip to end of metadata
Go to start of metadata

You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 3 Next »

\uD83E\uDD14 Problem

When event.log shows the error message like:

Outgoing publish message was dropped. Receiving consumer: nvqxa4djnzts2mbrpqvs6tlzirqxiyjpem======, topic: 1234-dev/MyData//1.234/STATION123E/NP/9876/ABCD/1146/12/3H, qos: 0, reason: The QoS 0 memory limit exceeded, size: 52,429,963 bytes, max: 52,428,800 bytes.

The message reason: The QoS 0 memory limit exceeded indicates that QoS messages are being consumed more slowly than produced for some time, the QoS 0 memory limit is exceeded hence the QoS 0 message was dropped, as for QoS 0 messages there is no delivery guarantee.

Note that the message contains no Client ID but the Receiving consumer's ID. This indicates that the slow recipient of the message is not an MQTT Client but a HiveMQ Enterprise Extension Consumer.

The event log message is being logged in the following way:

logMessageDropped.debug(OUTBOUND_PUBLISH_DROPPED_CONSUMER_MARKER,
        "Outgoing publish message was dropped. Receiving consumer: {}, topic: {}, qos: {}, reason: {}.",
        valueFromNullable(CONSUMER_ID_KEY, consumerId),
        valueFromNullable(TOPIC_KEY, topic),
        value(QOS_KEY, qos),
        value(REASON_KEY, reason));

So the consumer id is after the words Receiving consumer: {} and before the comma ,.

Each Extension uses its own way to generate a CONSUMER_ID_KEY.

This article will explain how to find out, to which HiveMQ Enterprise Extension the consumer belongs and how to identify the corresponding configuration item in the extension’s configuration files.

\uD83C\uDF31 Extension for Google Pub/Sub

  1. Look at the configuration XML of MQTT to Google Cloud Pub/Sub Extension. The mapping looks the following way:

    <mqtt-to-pubsub-mappings>
        <mqtt-to-pubsub-mapping>
            <id>mapping-01</id> <!-- REQUIRED -->
            <pubsub-connection>your-custom-connection-id</pubsub-connection> <!-- REQUIRED -->
            <preserve-message-properties>true</preserve-message-properties> <!-- OPTIONAL, Default = false -->
            <mqtt-topic-filters> <!-- REQUIRED -->
                <mqtt-topic-filter>+/MyData/#</mqtt-topic-filter> <!-- REQUIRED, at least one -->
                <mqtt-topic-filter>+/YourData/#</mqtt-topic-filter> <!-- OPTIONAL -->
                <mqtt-topic-filter>+/OurData/#</mqtt-topic-filter> <!-- OPTIONAL -->
            </mqtt-topic-filters>
            ...
            ...
        </mqtt-to-pubsub-mapping>
    </mqtt-to-pubsub-mappings>

    Where a mapping entity <id>mapping-01</id> contains multiple topic filters <mqtt-topic-filter>topic/a</mqtt-topic-filter>.

  2. Each unique Consumer Id will consist of a Mapping Entity Id and Topic Filter:

    final String consumerId = entityId + "|" + topicFilter;
  3. The consumer id will be encoded to avoid unsupported characters. The encoding method is Base32 encoding which is all in lowercase letters and without any padding characters:

    BaseEncoding CONSUMER_ID_ENCODING = BaseEncoding.base32().lowerCase().omitPadding();

    Encoding the ConsumerId:

    return CONSUMER_ID_ENCODING.encode(consumerId.getBytes(StandardCharsets.UTF_8));
  4. Decode ConsumerId nvqxa4djnzts2mbrpqvs6tlzirqxiyjpem====== into a human-readable form:

    import base64
    
    encoded_string = "nvqxa4djnzts2mbrpqvs6tlzirqxiyjpem======"
    decoded_bytes = base64.b32decode(encoded_string.encode(), casefold=True)
    decoded_string = decoded_bytes.decode()
    
    print(decoded_string) # Output: mapping-01|+/MyData/#

    Output mapping-01|+/MyData/# means that the consumer corresponds to the Pub/Sub configuration file mapping with id mapping-01 and topic filter +/MyData/#.

  5. Find the corresponding entity in the Google Cloud Pub/Sub configuration file:

If a Pub/Sub consumer frequently experiences this kind of dropped message: “The QoS 0 memory limit exceeded“, it can indicate that the Pub/Sub is consuming messages too slowly or has unstable network connections.

\uD83C\uDF31 Bridge Extension

  1. Look at the configuration XML of Bridge Extension.

    <bridges>
        <bridge>
            <enabled>true</enabled>
            <name>my-bridge-1</name>
            ...
        </bridge>
        <bridge>
            <enabled>true</enabled>
            <name>my-bridge-2</name>
            ...
        </bridge>
    </bridges>

  2. Consumer Id will be the Bridge name.

\uD83C\uDF31 Amazon Kinesis Extension

Amazon Kinesis Extension is using the same approach as Google Pub/Sub Extension:

private static final @NotNull BaseEncoding CONSUMER_ID_ENCODING = BaseEncoding.base32().lowerCase().omitPadding();
...
final String consumerId = routeId + "|" + topicFilter;
return CONSUMER_ID_ENCODING.encode(consumerId.getBytes(StandardCharsets.UTF_8));
  1. Look at the extension configuration:

    <mqtt-to-kinesis-routes>
      <mqtt-to-kinesis-route>
          <id>my-mqtt-to-kinesis-route</id>
          <enabled>true</enabled>
          <aws-credential-profile-id>aws-credential-profile-01</aws-credential-profile-id>
          <region>eu-central-1</region>
          <mqtt-topic-filters>
              <mqtt-topic-filter>mqtt/topic/a</mqtt-topic-filter>
          </mqtt-topic-filters>
          ...

  2. ConsumerId will be base32 encoded lowercase string my-mqtt-to-kinesis-route|mqtt/topic/anv4s23lror2c25dpfvvws3tfonuxgllsn52xizl4nvyxi5bporxxa2ldf5qq====.

Kafka Extension

In the Kafka Extension, a Consumer Id consists of kafka-extension_ prefix concatenated to a mapping id:

String CONSUMER_PREFIX = "kafka-extension_";
...
final String consumerId = CONSUMER_PREFIX + mqttToKafkaMapping.getId();

Look at the kafka-cofiguration.xml:

    <mqtt-to-kafka-mappings>
        <mqtt-to-kafka-mapping>
            <id>mapping01</id>
            <cluster-id>cluster01</cluster-id>
            <mqtt-topic-filters>
                <mqtt-topic-filter>#</mqtt-topic-filter>
            </mqtt-topic-filters>
            <kafka-topic>kafka-topic</kafka-topic>
        </mqtt-to-kafka-mapping>
    </mqtt-to-kafka-mappings>

The Consumer id will look like kafka-extension_mapping01.

  • No labels