top of page

Groovy / MOCA for WMS KAFKA send

  • thecodingguy
  • May 2
  • 2 min read

This is a Groovy script that can be used to send an XML message to a Kafka topic in a WMS env. I used it for testing sending to Kafka in WMS Dev and QA envs.


Note this is just a sample and the xml_to_send should be adjusted as you need to and the correct values should be set for the Kafka server, username, password, and topic. Also the Apache Kafka client .jar file will need to be in the WMS server environment classpath for this script to work.

It will send multiple XMLs if you use MOCA or SQL to generate multiple xml_to_send inside the { } to put into the res_send result set. There is also code commented out to perform a Kafka commit if needed and sleep between messages if needed. { publish data where xml_to_send =

"<SAMPLE>" ||

"<CTRL_SEG>" ||

"<TRNNAM>SAMPLE-TRAN</TRNNAM>" ||

"<TRNVER>2025.1</TRNVER>" ||

"<WHSE_ID>SAMPLE_WH_ID</WHSE_ID>" ||

"</CTRL_SEG>" ||

"</SAMPLE>"

} >> res_send

|

publish data where server_adr = @server_adr

and username = @username

and password = @password

and topic = @topic

and res_send = @res_send

|

[[

import org.apache.kafka.clients.producer.KafkaProducer;

import org.apache.kafka.clients.producer.ProducerConfig;

import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

import java.util.Scanner;


MocaContext ctx = MocaUtils.currentContext();

String server_address = ctx.getVariable( "server_adr" );

String username = ctx.getVariable( "username" );

String password = ctx.getVariable( "password" );

String topic_name = ctx.getVariable( "topic" );

ctx.trace( "========================================" );

ctx.trace( "Starting values - Server: " + server_address );

ctx.trace( "Username: " + username );

ctx.trace( "Password: " + password );

ctx.trace( "Topic: " + topic_name );

// Set configuration for the Producer

Properties configProperties = new Properties();


configProperties.put( ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, server_address );

configProperties.put( ProducerConfig.RETRIES_CONFIG, "0" );

configProperties.put( ProducerConfig.ACKS_CONFIG, "all" );


configProperties.put( ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer" );

configProperties.put( ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer" );


configProperties.put( "security.protocol", "SASL_SSL" );

configProperties.put( "sasl.mechanism", "PLAIN" );


String js = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";";


configProperties.put( "sasl.jaas.config", String.format(js, username, password) );


ctx.trace( "Creating Producer Class" );

org.apache.kafka.clients.producer.Producer<String, String> producer = new KafkaProducer<>( configProperties );

try

{

res_send.reset();

//producer.beginTransaction();

while ( res_send.next() )

{

String xml_message = res_send.getString( "xml_to_send" );


ctx.trace( "Sending XML message );

ctx.trace( "Server: " + server_address );

ctx.trace( "Topic: " + topic_name );

ctx.trace( "XML is: " + xml_message );

ProducerRecord<String, String> rec = new ProducerRecord<String, String>( topic_name, xml_message );

ctx.trace( "===== Sending now =====" );

producer.send( rec );

ctx.trace( "===== Successfully sent message =====" );

// for multiple messages optional sleep - Thread.sleep( 250 );

}

//producer.commit( Transaction );

}

catch ( Exception exp )

{

ctx.trace( exp.toString() );

//producer.abortTransaction();

}

finally

{

producer.close();

ctx.trace( "After closing KafkaProducer" );

}


ctx.trace( "========================================" );

]]

Recent Posts

See All
Logo showing an abstract cartoon figure sitting in front of a laptop coding.  Next to the Logo is the text The Coding Guy

© 2025 by The Coding Guy, LLC

bottom of page