- thecodingguy
- May 2
- 2 min read
This is a Groovy script that can be used to send an XML message to a Kafka topic in a WMS env. I used it for testing sending to Kafka in WMS Dev and QA envs.
Note this is just a sample and the xml_to_send should be adjusted as you need to and the correct values should be set for the Kafka server, username, password, and topic. Also the Apache Kafka client .jar file will need to be in the WMS server environment classpath for this script to work.
It will send multiple XMLs if you use MOCA or SQL to generate multiple xml_to_send inside the { } to put into the res_send result set. There is also code commented out to perform a Kafka commit if needed and sleep between messages if needed. { publish data where xml_to_send =
"<SAMPLE>" ||
"<CTRL_SEG>" ||
"<TRNNAM>SAMPLE-TRAN</TRNNAM>" ||
"<TRNVER>2025.1</TRNVER>" ||
"<WHSE_ID>SAMPLE_WH_ID</WHSE_ID>" ||
"</CTRL_SEG>" ||
"</SAMPLE>"
} >> res_send
|
publish data where server_adr = @server_adr
and username = @username
and password = @password
and topic = @topic
and res_send = @res_send
|
[[
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
import java.util.Scanner;
MocaContext ctx = MocaUtils.currentContext();
String server_address = ctx.getVariable( "server_adr" );
String username = ctx.getVariable( "username" );
String password = ctx.getVariable( "password" );
String topic_name = ctx.getVariable( "topic" );
ctx.trace( "========================================" );
ctx.trace( "Starting values - Server: " + server_address );
ctx.trace( "Username: " + username );
ctx.trace( "Password: " + password );
ctx.trace( "Topic: " + topic_name );
// Set configuration for the Producer
Properties configProperties = new Properties();
configProperties.put( ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, server_address );
configProperties.put( ProducerConfig.RETRIES_CONFIG, "0" );
configProperties.put( ProducerConfig.ACKS_CONFIG, "all" );
configProperties.put( ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer" );
configProperties.put( ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer" );
configProperties.put( "security.protocol", "SASL_SSL" );
configProperties.put( "sasl.mechanism", "PLAIN" );
String js = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";";
configProperties.put( "sasl.jaas.config", String.format(js, username, password) );
ctx.trace( "Creating Producer Class" );
org.apache.kafka.clients.producer.Producer<String, String> producer = new KafkaProducer<>( configProperties );
try
{
res_send.reset();
//producer.beginTransaction();
while ( res_send.next() )
{
String xml_message = res_send.getString( "xml_to_send" );
ctx.trace( "Sending XML message );
ctx.trace( "Server: " + server_address );
ctx.trace( "Topic: " + topic_name );
ctx.trace( "XML is: " + xml_message );
ProducerRecord<String, String> rec = new ProducerRecord<String, String>( topic_name, xml_message );
ctx.trace( "===== Sending now =====" );
producer.send( rec );
ctx.trace( "===== Successfully sent message =====" );
// for multiple messages optional sleep - Thread.sleep( 250 );
}
//producer.commit( Transaction );
}
catch ( Exception exp )
{
ctx.trace( exp.toString() );
//producer.abortTransaction();
}
finally
{
producer.close();
ctx.trace( "After closing KafkaProducer" );
}
ctx.trace( "========================================" );
]]