After Building microservices with Netflix OSS, Apache Kafka and Spring Boot – Part 1: Service registry and Config server here is what comes next:
Message Broker (Kafka & ZooKeeper)
Although we are not going to use the distributed features of Kafka for the test, it is still distributed system and is built to use Zookeeper to track status of its cluster nodes, topics, partitions etc. So before using Kafka it is necessary to have Zookeeper installed. The following commands are for installing Zookeeper and Kafka on Ubuntu 16.04
Install zookeeper
$ sudo apt-get install zookeeperd
When installed, zookeeper will be automatically started as a daemon, and by default will be listening on port 2181
Ask zookeeper if it is ok
$ telnet localhost 2181
type in ruok and press enter and you should be answered: imokConnection closed by foreign host.
Download the latest Kafka
Go to https://kafka.apache.org/downloads and look for the latest binary release link (currently it is Kafka_2.12-0.11.0.1.tgz). Following the link will be navigated to a page suggesting the mirror site for your download
$ mkdir ~/kafka $ wget https://apache.cbox.biz/kafka/0.11.0.1/kafka_2.11-0.11.0.1.tgz $ tar -xvzf kafka_2.11-0.11.0.1.tgz ~/kafka
Configure the Kafka Server
You need to update server.properties file. By default deleting topics is disabled so it is good to enable it, adding delete.topic.enable at the end of the file.
$ nano ~/kafka/config/server.properties delete.topic.enable = true
Run the Kafka Server as a background process
$ nohup ~/kafka/bin/kafka-server-start.sh ~/kafka/config/server.properties > ~/kafka/kafka.log 2>&1 &
Verify Kafka is running
$ echo dump | nc localhost 2181 | grep brokers
User Service
Now we have Kafka running and we can continue with building the user microservice. As mentioned in Part 1 It will:
- 1. Register itself to the Service registry (Eureka)
- 2. Take its configuration from the Config server (Spring Cloud Config)
- 3. Have two endpoints
- /member – where with POST request will register the new users
- /member – where with GET request will be able to take all registered users
- 4. On every new registration the User service will send a message “USER_REGISTERED” to the message broker (Kafka)
- 5. Store the registered users in memory H2 database for later reference
Let’s first create a new Spring boot project(ms-user) with SPRING INITIALIZR.
The following dependencies will be needed:Â Eureka Discovery; JPA; H2; Kafka; Config Client;
/pom.xml
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-config</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-eureka</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-jpa</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>com.h2database</groupId> <artifactId>h2</artifactId> <scope>runtime</scope> </dependency>
Same as for the config server, to enable discovery client update the main Application file adding the @EnableEurekaClient annotation and in the application configuration file, add the name and the running port for the microservice. The new here is enabling cloud config discovery. It will make the microservice to look for the config server with the help of the Service registry only having the Config server id. And no hardcoded urls or ports are needed.
/bootstrap.yml
server: port: 8081 spring: application: name: ms-user cloud: config: discovery: enabled: true service-id: ms-config-server
The configurations for the h2, datasource, and Kafka will be read from the config server so they go in the ms-config-properties folder under ms-user
/ms-user.yml
spring: h2: console: enabled: true path: /h2-console datasource: url: jdbc:h2:mem:testdb;DB_CLOSE_DELAY=-1;DB_CLOSE_ON_EXIT=FALSE username: sa password: kafka: bootstrap-servers: localhost:9092 topic: userCreated: USER_CREATED_TOPIC security: basic: enabled: false
Firstly we will create simple Spring web project structure with User entity, UserRepository, UserService and UserController. We will not discuss them widely as they are common and the structure is frequently used for spring projects.
The User entity will be used to transfer the data. It has simple structure, just username and password. We will put the email to which will send a confirmation message as username.
/User. java
@Id @GeneratedValue(strategy = GenerationType.IDENTITY) private Long id; @NotNull private String username; @NotNull private String password;
We will use Spring data to handle the CRUD operations on the User entity, so UserRepository will be simple too.
/UserRepository.java
public interface UserRepository extends CrudRepository<User, Long> { }
In the UserService we will have the methods for registering a user and getting all users
/UserService.java
public interface UserService { User registerUser(User input); Iterable<User> findAll(); }
Finally in UserController create the GET /member and POST /member REST endpoints
Just calling the UserService and returning the result from it.
@RequestMapping(method = RequestMethod.GET, path = "/member") public ResponseEntity<Iterable<User>> getAll() { Iterable<User> all = userService.findAll(); return new ResponseEntity<>(all, HttpStatus.OK); } @RequestMapping(method = RequestMethod.POST, path = "/member") public ResponseEntity<User> register(@RequestBody User input) { User result = userService.registerUser(input); return new ResponseEntity<>(result, HttpStatus.OK); }
Let’s take a closer look at the sender configuration. To be able to produce messages for the Kafka topics we need KafkaTemplate which is the class responsible for executing high-level operations. The KafkaTemplate needs ProducerFactory that sets the strategy to produce a Producer instance(s). The ProducerFactory for its part needs a Map of configuration properties. The most important of which are BOOTSTRAP_SERVERS_CONFIG, KEY_SERIALIZER_CLASS_CONFIG and VALUE_SERIALIZER_CLASS_CONFIG.
/SenderConfig.java
@Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; @Bean public Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); return props; } @Bean public ProducerFactory<String, User> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } @Bean public KafkaTemplate<String, User> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } @Bean public Sender sender() { return new Sender(); }
Here we configured to send messages to Kafka server on localhost:9092 (bootstrapServers – taken from cloud config server). The String message payload is transformed from User object with the help of JsonSerializer. Finish with the implementation of Sender bean which will use the above configured KafkaTemplate
/Sender.java
public class Sender { @Autowired private KafkaTemplate<String, User> kafkaTemplate; public void send(String topic, User payload) { kafkaTemplate.send(topic, payload); } }
The business logic to send message to Kafka when a new user is saved in the database goes in the UserService implementation
/UserServiceImpl.java
@Value("${spring.kafka.topic.userCreated}") private static String USER_CREATED_TOPIC; private UserRepository userRepository; private Sender sender; @Override public User registerUser(User input) { User createdUser = userRepository.save(input); sender.send(USER_CREATED_TOPIC, createdUser); return createdUser; }
You can check the next parts of the blog:
● Building microservices with Netflix OSS, Apache Kafka and Spring Boot – Part 3: Email service and Gateway
● Building microservices with Netflix OSS, Apache Kafka and Spring Boot – Part 4: Security.
Don’t forget to share your opinion in the comments section below.