Building microservices with Netflix OSS, Apache Kafka and Spring Boot – Part 2: Message Broker and User service

After Building microservices with Netflix OSS, Apache Kafka and Spring Boot – Part 1: Service registry and Config server here is what comes next: Message Broker (Kafka & ZooKeeper) Although we are not going to use the distributed features of Kafka for the test, it is still distributed system and is built to use Zookeeper to track […]

by Iskren Ivanov

November 7, 2017

5 min read

Building microservices with Netflix OSS Apache kafka and Spring Boot1 - Building microservices with Netflix OSS, Apache Kafka and Spring Boot - Part 2: Message Broker and User service

After Building microservices with Netflix OSS, Apache Kafka and Spring Boot – Part 1: Service registry and Config server here is what comes next:

Message Broker (Kafka & ZooKeeper)

Although we are not going to use the distributed features of Kafka for the test, it is still distributed system and is built to use Zookeeper to track status of its cluster nodes, topics, partitions etc. So before using Kafka it is necessary to have Zookeeper installed. The following commands are for installing Zookeeper and Kafka on Ubuntu 16.04

Install zookeeper

$ sudo apt-get install zookeeperd

When installed, zookeeper will be automatically started as a daemon, and by default will be listening on port 2181

Ask zookeeper if it is ok

$ telnet localhost 2181

type in ruok and press enter and you should be answered: imokConnection closed by foreign host.

Download the latest Kafka

Go to https://kafka.apache.org/downloads and look for the latest binary release link (currently it is Kafka_2.12-0.11.0.1.tgz). Following the link will be navigated to a page suggesting the mirror site for your download

$ mkdir ~/kafka
$ wget https://apache.cbox.biz/kafka/0.11.0.1/kafka_2.11-0.11.0.1.tgz
$ tar -xvzf kafka_2.11-0.11.0.1.tgz ~/kafka

Configure the Kafka Server

You need to update server.properties file. By default deleting topics is disabled so it is good to enable it, adding delete.topic.enable at the end of the file.

$ nano ~/kafka/config/server.properties
delete.topic.enable = true

Run the Kafka Server as a background process

$ nohup ~/kafka/bin/kafka-server-start.sh ~/kafka/config/server.properties > ~/kafka/kafka.log 2>&1 &

Verify Kafka is running

$ echo dump | nc localhost 2181 | grep brokers

User Service

Now we have Kafka running and we can continue with building the user microservice. As mentioned in Part 1 It will:

  • 1. Register itself to the Service registry (Eureka)
  • 2. Take its configuration from the Config server (Spring Cloud Config)
  • 3. Have two endpoints
    • /member – where with POST request will register the new users
    • /member – where with GET request will be able to take all registered users
  • 4. On every new registration the User service will send a message “USER_REGISTERED” to the message broker (Kafka)
  • 5. Store the registered users in memory H2 database for later reference

Let’s first create a new Spring boot project(ms-user) with SPRING INITIALIZR.

The following dependencies will be needed: Eureka Discovery; JPA; H2; Kafka; Config Client;

/pom.xml

<dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-config</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-eureka</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-jpa</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>
    <dependency>
        <groupId>com.h2database</groupId>
        <artifactId>h2</artifactId>
        <scope>runtime</scope>
    </dependency>

Same as for the config server, to enable discovery client update the main Application file adding the @EnableEurekaClient annotation and in the application configuration file, add the name and the running port for the microservice. The new here is enabling cloud config discovery. It will make the microservice to look for the config server with the help of the Service registry only having the Config server id. And no hardcoded urls or ports are needed.

/bootstrap.yml

server:
  port: 8081
spring:
  application:
    name: ms-user
  cloud:
    config:
      discovery:
        enabled: true
        service-id: ms-config-server

The configurations for the h2, datasource, and Kafka will be read from the config server so they go in the ms-config-properties folder under ms-user

/ms-user.yml

spring:
  h2:
    console:
      enabled: true
      path: /h2-console
  datasource:
    url: jdbc:h2:mem:testdb;DB_CLOSE_DELAY=-1;DB_CLOSE_ON_EXIT=FALSE
    username: sa
    password:
  kafka:
    bootstrap-servers: localhost:9092
    topic:
      userCreated: USER_CREATED_TOPIC
security:
  basic:
    enabled: false

Firstly we will create simple Spring web project structure with User entity, UserRepository, UserService and UserController. We will not discuss them widely as they are common and the structure is frequently used for spring projects.

The User entity will be used to transfer the data. It has simple structure, just username and password. We will put the email to which will send a confirmation message as username.

/User. java

@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@NotNull
private String username;
@NotNull
private String password;

We will use Spring data to handle the CRUD operations on the User entity, so UserRepository will be simple too.

/UserRepository.java

public interface UserRepository extends CrudRepository<User, Long> {
}

In the UserService we will have the methods for registering a user and getting all users

/UserService.java

public interface UserService {
    User registerUser(User input);
    Iterable<User> findAll();
}

Finally in UserController create the GET /member and POST /member REST endpoints

Just calling the UserService and returning the result from it.

@RequestMapping(method = RequestMethod.GET, path = "/member")
public ResponseEntity<Iterable<User>> getAll() {
    Iterable<User> all = userService.findAll();
    return new ResponseEntity<>(all, HttpStatus.OK);
}

@RequestMapping(method = RequestMethod.POST, path = "/member")
public ResponseEntity<User> register(@RequestBody User input) {
    User result = userService.registerUser(input);
    return new ResponseEntity<>(result, HttpStatus.OK);
}

Let’s take a closer look at the sender configuration. To be able to produce messages for the Kafka topics we need KafkaTemplate which is the class responsible for executing high-level operations. The KafkaTemplate needs ProducerFactory that sets the strategy to produce a Producer instance(s). The ProducerFactory for its part needs a Map of configuration properties. The most important of which are BOOTSTRAP_SERVERS_CONFIG, KEY_SERIALIZER_CLASS_CONFIG and VALUE_SERIALIZER_CLASS_CONFIG.

/SenderConfig.java

@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;

@Bean
public Map<String, Object> producerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

    return props;
}

@Bean
public ProducerFactory<String, User> producerFactory() {
    return new DefaultKafkaProducerFactory<>(producerConfigs());
}

@Bean
public KafkaTemplate<String, User> kafkaTemplate() {
    return new KafkaTemplate<>(producerFactory());
}

@Bean
public Sender sender() {
    return new Sender();
}

Here we configured to send messages to Kafka server on localhost:9092 (bootstrapServers – taken from cloud config server). The String message payload is transformed from User object with the help of JsonSerializer. Finish with the implementation of Sender bean which will use the above configured KafkaTemplate

/Sender.java

public class Sender {

    @Autowired
    private KafkaTemplate<String, User> kafkaTemplate;

    public void send(String topic, User payload) {
        kafkaTemplate.send(topic, payload);
    }
}

The business logic to send message to Kafka when a new user is saved in the database goes in the UserService implementation

/UserServiceImpl.java

@Value("${spring.kafka.topic.userCreated}")
private static String USER_CREATED_TOPIC;

private UserRepository userRepository;
private Sender sender;

@Override
public User registerUser(User input) {
    User createdUser = userRepository.save(input);
    sender.send(USER_CREATED_TOPIC, createdUser);
    return createdUser;
}

You can check the next parts of the blog:
● Building microservices with Netflix OSS, Apache Kafka and Spring Boot – Part 3: Email service and Gateway
● Building microservices with Netflix OSS, Apache Kafka and Spring Boot – Part 4: Security.

Don’t forget to share your opinion in the comments section below.

Java Expert at Dreamix