It often times is not a problem at all to solve

What I did today was memorable to me. It was a short cut through time indeed. How? Here is how.
I was trying to use a class’ object reference in an annotation based KafkaListener Object like this:

@Autowired
    @Qualifier("kafkaConfigurationPropertiesAccessor")
    private KafkaConfigurationProperties kafkaConfigurationProperties;
    @KafkaListener(
            id="process-payment-success",
//            topics = "${env.kafka.local-config.topic}",
            topics = "#{kafkaConfigurationProperties.getKafkaTopic()}",
            groupId = "${env.kafka.local-config.group-id}"
    )
    public void listen(@Payload List<String> messages,
                       @Header("kafka_receivedPartitionId") List<Integer> partitions,  //KafkaHeaders.RECEIVED_PARTITION_ID in spring-boot 2x, is ....RECEIVED_PARTITION in 3x
                       @Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
                       @Header(KafkaHeaders.OFFSET) List<Long> offsets) {
        log.info("Kafka GKS listener is active");
        for(String message: messages) {
            log.info("Message: {}, acknowledged from listener on topic {}", message.toString(), topics);
        }
    }

While I was trying to get this working, I saw another error about an unresolved dependency, for MeterRegistry library (is important as it drives the timings and metrics’ generation which is at the heart of the effort in the SRE space), that failed my app from starting, after which I could come hit a break point around the above block of code I want to test and work with; based on which I decide if this is an approach to be taken about the above refactor I am working on.

In fact, seemed like an important issue to be resolved before I could test my refactor that would only add more enhancement to the observability performance. But for that enhancement to be applied, the base solution should be working. It is not, right now.
But, the fact that I still had another way to run the application (via terminal using `java -jar <jar path>`) ensured that the app code is fine to run without an IDE and the problem is only with the IDE that’s unable to find a dependency’s library byte code and ends up complaining. A combination of these facts lets us infer that, the unresolved dependency issue is a probable false positive and need not be battled with as of now; and instead comment this piece of code, get back to resolving the Method Expressions using SpEL.

That’s when I was able to get back to focusing on the main issue at hand – Consolidating the Kafka listeners to make them reusable between local and cloud based Kafka broker modes and resulted in the following refactor and here’s the link to the official documentation that allowed me deduce a method to apply in this scenario: https://docs.oracle.com/javaee/6/tutorial/doc/bnahu.html#bnahz.

@Slf4j
@Service
@ConditionalOnExpression("'${env.kafka.mode}' != 'none'")
public class KafkaConsumerLocalOrCloud {
    @Autowired
    @Qualifier("kafkaConfigurationPropertiesAccessor")
    private KafkaConfigurationProperties kafkaConfigurationPropertiesAccessor;

    // Method expressions using SpEL: https://docs.oracle.com/javaee/6/tutorial/doc/bnahu.html#bnahz
    @KafkaListener(
            id="#{kafkaConfigurationPropertiesAccessor.getKafkaListenerId}",
            topics = "#{kafkaConfigurationPropertiesAccessor.getKafkaTopic}",
            groupId = "#{kafkaConfigurationPropertiesAccessor.getGroupId}",
            containerFactory = "#{kafkaConfigurationPropertiesAccessor.getContainerFactory}",
            clientIdPrefix = "#{kafkaConfigurationPropertiesAccessor.getClientIdPrefix}"
    )
    public void listen(@Payload List<String> messages,
                       @Header("kafka_receivedPartitionId") List<Integer> partitions,  //KafkaHeaders.RECEIVED_PARTITION_ID in spring-boot 2x, is ....RECEIVED_PARTITION in 3x
                       @Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
                       @Header(KafkaHeaders.OFFSET) List<Long> offsets) {
        log.info("Kafka GKS listener is active");
        for(String message: messages) {
            log.info("Message: {}, acknowledged from listener on topic {}", message.toString(), topics);
        }
    }
}

That’s it. Both the issues were resolved. If I instead spent more time on the IDE issue, it would’ve led to a rabbit hole as there was not relationship between my actual focal point (usage of Method Expressions to access properties dynamically and switch modes of listeners) and the issue at hand (unable to run using IDE while app runs fine via terminal).

Gist: Find the right direction to invest time in at work. Do not go tangent from your focal point of interest. Have a mechanism to tie your efforts back to a pivot that aligns you in the right direction. In this case it was to have the Method Expressions using SpEL working so that the Kafka Listeners can work dynamically with both locally running Kafka broker/server and Cloud based Kafka broker/server and their respective topics.

Published by Abhay Nagaraj B R

I think in terms of problem-solving. I like picking up problems from real life and applying data science solutions to them. For instance, when I saw my mom cut a bunch of okras, I noticed how she cuts them one at a time of which the final output was the same. And, I thought, when it’s the same cut (single instruction, SI) on every Okra, then why not cut a bunch (multiple data, MD) of them at once? There we go! SIMD in real life! Which is exactly what GPUs do. This is one of the illustrations of how I look at a problem and work towards resolving it. And, I strongly believe that I can use this mindset of mine, in combination with a good insight into the problem at hand, will be able to develop efficient solutions.

Leave a comment