na hora que recebo uma mensagem via kafka minha aplicação consummer retorna esse erro:
com.google.protobuf.InvalidProtocolBufferException: Protocol message end-group tag did not match expected tag.
meu Consummer:
@Service(“SetupReceiverConsumer”)
@Log4j2
public class SetupReceiverConsumer {
@Autowired
private SetupReceiverService service;
@Autowired
private ModelMapper modelMapper;
@KafkaListener(topics = "${spring.kafka.consumer.topic}", groupId = "${spring.kafka.consumer.group-id}")
public void execute(ConsumerRecord<String, byte[]> message) {
try {
Command comando = Command.parseFrom(message.value());
Timestamp tempo = comando.getChronology().getCreationDate();
Date data = this.modelMapper.map(tempo, Date.class);
LastSetup lastSetup = new LastSetup();
lastSetup.setProtocolo(comando.getCmd().getProtocolId());
lastSetup.setEsn(Long.parseLong(comando.getDevice().getEsn()));
lastSetup.setSetup(comando.getCmd().getPayload(0).toByteArray());
lastSetup.setDataBd(new Date());
lastSetup.setDataGravacao(data);
lastSetup.setVeioId(comando.getDevice().getModelId());
OldSetup oldSetup = new OldSetup();
oldSetup.setProtocolo(comando.getCmd().getProtocolId());
oldSetup.setEsn(Long.parseLong(comando.getDevice().getEsn()));
oldSetup.setSetup(comando.getCmd().getPayload(0).toByteArray());
oldSetup.setDataBd(new Date());
oldSetup.setDataGravacao(data);
oldSetup.setVeioId(comando.getDevice().getModelId());
service.execute(lastSetup, oldSetup);
} catch (Exception e) {
log.error(e);
}
}
}
Existem outros consumers conseguindo ler o tópico?
Se sim, tem algo errado no protocolo usado no seu consumer. Verifique se as versões estão compatíveis entre os producers e os consumers.
não tem.
vou compartilhar o producer.
@Service(“SetupService”)
public class SetupService {
String topicName = "iotBvfmDeviceGatewaySetup";
@Autowired
private ModelMapper modelMapper;
public void send(LastSetup setup) throws JsonProcessingException{
propriedades();
LastSetup lastSetup = new LastSetup();
Command comando = this.modelMapper.map(lastSetup,Command.class);
LastSetup last = this.modelMapper.map(comando,LastSetup.class);
Producer<String, byte[]> producer = new KafkaProducer<>(propriedades());
final ProducerRecord<String,byte[]> producerRecord = new ProducerRecord<String, byte[]>(topicName,SerializationUtils.serialize(last));
producer.send(producerRecord,
(recordMetadata, exception) -> {
if (exception == null){
System.out.println("Record written to offset " + recordMetadata.offset() + " timestamp " + recordMetadata.timestamp());
} else {
System.err.println("An error occurred");
exception.printStackTrace(System.err); }
});
producer.flush();
producer.close();
}
private Properties propriedades() {
Properties props = new Properties();
props.put("key.serializer","org.apache.kafka.common.serialization.ByteArraySerializer");
props.put("value.serializer","org.apache.kafka.common.serialization.ByteArraySerializer");
props.put("bootstrap.servers","dev-newkafka01.sascar.com.br:9092,dev-newkafka02.sascar.com.br:9092,dev-newkafka03.sascar.com.br:9092");
props.put("acks", "1");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
return props;
}
}
a versão do protobuf é exatamente iguais nos dois MS.
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.21.7</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java-util</artifactId>
<version>3.21.7</version>
</dependency>