Suppress - surprise? - Kafka Streams and the suppress operator


February 25, 2020

Even though I'm in danger of talking about things that have already gotten around, I'd like to briefly talk about the suppress operator of the Kafka Streams API and a surprise we encountered when we first used it.

In one of our services we want to ensure that the API rate limit is not exceeded when communicating with FreshSales. The code snippet shown below processes a stream of FreshContact records, groups them by the FreshContact Id (which we use as key) and then merges the different properties of the FreshContact as they appear in different variations of the FreshContact records.

// process the and update contacts in FreshSales
streamsBuilder.<String, FreshContact>stream( KAFKA_TOPIC_FRESH_CONTACT.getValue() )
.peek( ( k, v ) -> LOGGER.debug( "Processing FreshContact {}={}", k, v ) )
.reduce( this::reduceFreshContacts )
.suppress( Suppressed.untilTimeLimit( Duration.ofSeconds( 60 ), Suppressed.BufferConfig.unbounded() ) )
.filterNot( ( key, value ) -> value.getId() == null )
.peek( ( key, value ) -> sleepUntilRateNotExceeded() )
.peek( this::notify )
.foreach( this::updateFreshContact );

The suppress operator shown above uses the Suppressed.untilTimeLimit method to set a time period of 60 seconds to wait before emitting the event further downstream. The second argument to this method, Supressed.BufferConfig.unbounded(), creates an unconstrained buffer for the events occurring during the wait period. As we are not expecting a high volume of unique FreshContacts we will not run into memory issues with this.
In the remaining part we skip any records without an Id, if necessary wait until the actual API usage rate (the current rate is tracked by a class property) is not longer exceeded, send a message to Slack about the updated FreshContact and finally, make the API call to FreshSales to update the FreshContact.

When we tested this the first time we were surprised about the fact that every now and again an expected update to a FreshContact did not happen although we could see a 'Processing FreshContact...' in the logs. What we were not aware at this time was the fact that the suppress operator is based on event-time and as long as no new records arrive the stream is basically frozen. There is an excellent Blog post from Confluent explaining this in detail in case you are battling with similar issues.



Working as a software engineer for many years mostly in the JVM environment. Skilled in Scrum and Agile practices, currently scrum master of