Author: Shekh Muenuddeen

We got new changes in Anypoint Studio 7.6.0 along with graphical support for Parallel For Each.

As the name suggests that we are doing something parallel, but what?

So, here I am providing an information sample project with use cases to help you in understanding the above.

Definition

Process a collection of messages by splitting the collection into parts that are simultaneously processed in separate routes within the scope of any limitation configured for concurrent-processing. 

After all messages are processed, the results are aggregated following the same order they were in before the split and then the flow continues.

Configuration

Here are the configuration attributes that we need to give. If we don’t provide values here, it will consider the default values only.

Use Cases

Suppose we have a list of employee ids and we need to fetch employee data so we can use parallel for each APIs request so it will fetch data in parallel.

Suppose, we have a number of products and we need product prices and specifications so we can use parallel for each and call product API in parallel.

So there are a number of use cases, where we need parallelism. Hence, parallel for each is a nice option.

Example:

I have an array of numbers from 1 to 5 and I want to add 10 on each number.

XML 

<?xml version="1.0" encoding="UTF-8"?>

<mule xmlns:ee="http://www.mulesoft.org/schema/mule/ee/core" xmlns:http="http://www.mulesoft.org/schema/mule/http"
	xmlns="http://www.mulesoft.org/schema/mule/core"
	xmlns:doc="http://www.mulesoft.org/schema/mule/documentation" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd
http://www.mulesoft.org/schema/mule/ee/core http://www.mulesoft.org/schema/mule/ee/core/current/mule-ee.xsd">
	<http:listener-config name="HTTP_Listener_config" doc:name="HTTP Listener config" doc:id="8c02eb3e-540b-460d-87d0-13b2aa7b3fbf" >
		<http:listener-connection host="0.0.0.0" port="8081" />
	</http:listener-config>
	<flow name="parallel-for-each-example-1Flow" doc:id="378738f0-8a94-4158-a0c0-1e5cd8185be6" >
		<http:listener doc:name="parallelforeach" doc:id="e7eee7c5-c087-4ca8-a9ec-c9699cebffd1" config-ref="HTTP_Listener_config" path="parallelforeach"/>
		<ee:transform doc:name="1 to 10 numbers in array" doc:id="f728278e-bb31-4e91-b810-7a2c3cc138c1" >
			<ee:message >
				<ee:set-payload ><![CDATA[%dw 2.0
output application/json
---
[1 to 5][0]]]></ee:set-payload>
			</ee:message>
		</ee:transform>
		<parallel-foreach doc:name="Parallel For Each" doc:id="9fd67d09-3113-4ffb-9951-a2a8e25a81d7" >
			<logger level="INFO" doc:name="Parallel For Each Inside" doc:id="d6f76d17-04d6-4383-8a6a-2f33f82fb72c" message="Parallel For Each Inside #[payload]"/>
			<ee:transform doc:name="payload as Number + 10" doc:id="558f60ad-6901-4b18-a5ee-f42e9e0fb9e5" >
				<ee:message >
					<ee:set-payload ><![CDATA[%dw 2.0
output application/java
---
payload as Number + 10]]></ee:set-payload>
				</ee:message>
			</ee:transform>
		</parallel-foreach>
		<ee:transform doc:name="to JSON" doc:id="99cd1f86-0708-4262-a0cd-589757677108" >
			<ee:message >
				<ee:set-payload ><![CDATA[%dw 2.0
output application/json
---
payload]]></ee:set-payload>
			</ee:message>
		</ee:transform>
		<logger level="INFO" doc:name="Parallel For Each Payload" doc:id="f63087ba-1b68-4e15-a6de-fbe7bd8bfb55" message="Parallel For Each Payload #[payload]"/>
	</flow>
</mule>

Output

As you can see here, I have got the collection as an output and we have all properties and attributes along with the payload.

[
  {
    "inboundAttachmentNames": [],
    "exceptionPayload": null,
    "inboundPropertyNames": [],
    "outboundAttachmentNames": [],
    "outboundPropertyNames": [],
    "payload": 11,
    "attributes": null
  },
  {
    "inboundAttachmentNames": [],
    "exceptionPayload": null,
    "inboundPropertyNames": [],
    "outboundAttachmentNames": [],
    "outboundPropertyNames": [],
    "payload": 12,
    "attributes": null
  },
  {
    "inboundAttachmentNames": [],
    "exceptionPayload": null,
    "inboundPropertyNames": [],
    "outboundAttachmentNames": [],
    "outboundPropertyNames": [],
    "payload": 13,
    "attributes": null
  },
  {
    "inboundAttachmentNames": [],
    "exceptionPayload": null,
    "inboundPropertyNames": [],
    "outboundAttachmentNames": [],
    "outboundPropertyNames": [],
    "payload": 14,
    "attributes": null
  },
  {
    "inboundAttachmentNames": [],
    "exceptionPayload": null,
    "inboundPropertyNames": [],
    "outboundAttachmentNames": [],
    "outboundPropertyNames": [],
    "payload": 15,
    "attributes": null
  }
]

Error  Handling

Because every route is processed in parallel, if an error is thrown in one route, processing continues in all of the other routes until all finish processing.

So, a failure router will not stop the processing of other routes which are executing in parallel.

We can get Success and Failure using below Dataweave.

It’s throwing Mule exception type MULE:COMPOSITE_ROUTING

%dw 2.0
output application/json
---
{
	"Success" : error.errorMessage.payload.results,
	"Failures" : error.errorMessage.payload.failures
}

XML

Via error handling we can get success messages with failures.

<error-handler >
	<on-error-continue enableNotifications="true" logException="true" doc:name="On Error Continue" doc:id="17660890-8180-4312-89e5-8503f34ebf07" type="MULE:COMPOSITE_ROUTING">
				<ee:transform doc:name="Transform Message" doc:id="87ec677d-cfa7-4305-8dc6-b2d314996f39" >
					<ee:message >
						<ee:set-payload ><![CDATA[%dw 2.0
output application/json
---
{
	"Success" : error.errorMessage.payload.results,
	"Failures" : error.errorMessage.payload.failures
}

]]></ee:set-payload>
					</ee:message>
				</ee:transform>
	</on-error-continue>
</error-handler>

Output

{
  "Success": {
    "0": {
      "inboundAttachmentNames": [],
      "exceptionPayload": null,
      "inboundPropertyNames": [],
      "outboundAttachmentNames": [],
      "outboundPropertyNames": [],
      "payload": 11,
      "attributes": null
    }
  },
  "Failures": {
    "1": {
      "errorType": {
        "identifier": "EXPRESSION",
        "parentErrorType": {
          "identifier": "ANY",
          "parentErrorType": null,
          "namespace": "MULE"
        },
        "namespace": "MULE"
      },
      "childErrors": [],
      "errorMessage": null,
      "cause": {
        "localizedMessage": "\"Cannot coerce String (Shekh) to Number\n\n5| \"Shekh\" as Number\n   ^^^^^^^^^^^^^^^^^\nTrace:\n  at main (line: 5, column: 1)\" evaluating expression: \"%dw 2.0\noutput application/java\n---\nif(payload == 2)\n\"Shekh\" as Number\nelse\n(payload as Number + 10)\n\".",
        "cause": {
          "localizedMessage": "Cannot coerce String (Shekh) to Number\n\n5| \"Shekh\" as Number\n   ^^^^^^^^^^^^^^^^^\nTrace:\n  at main (line: 5, column: 1)",
          "cause": null,
          "message": "Cannot coerce String (Shekh) to Number\n\n5| \"Shekh\" as Number\n   ^^^^^^^^^^^^^^^^^\nTrace:\n  at main (line: 5, column: 1)",
          "stackTrace": [],
          "suppressed": []
        },
        "message": "\"Cannot coerce String (Shekh) to Number\n\n5| \"Shekh\" as Number\n   ^^^^^^^^^^^^^^^^^\nTrace:\n  at main (line: 5, column: 1)\" evaluating expression: \"%dw 2.0\noutput application/java\n---\nif(payload == 2)\n\"Shekh\" as Number\nelse\n(payload as Number + 10)\n\".",
        "stackTrace": [],
        "suppressed": []
      },
      "description": "\"Cannot coerce String (Shekh) to Number\n\n5| \"Shekh\" as Number\n   ^^^^^^^^^^^^^^^^^\nTrace:\n  at main (line: 5, column: 1)\" evaluating expression: \"%dw 2.0\noutput application/java\n---\nif(payload == 2)\n\"Shekh\" as Number\nelse\n(payload as Number + 10)\n\".",
      "detailedDescription": "\"Cannot coerce String (Shekh) to Number\n\n5| \"Shekh\" as Number\n   ^^^^^^^^^^^^^^^^^\nTrace:\n  at main (line: 5, column: 1)\" evaluating expression: \"%dw 2.0\noutput application/java\n---\nif(payload == 2)\n\"Shekh\" as Number\nelse\n(payload as Number + 10)\n\"."
    }
  }
}

For Each vs Parallel For Each

Both basically process the collection and each route runs with the same initial context.

The difference between these two scopes are:

  1. For Each works sequentially, while the Parallel For Each processes in parallel. This difference affects error handling. Because of the processing differences, the execution of For Each execution is interrupted when an error is raised (and the Error Handler is invoked), while Parallel For Each processes every route before invoking the Error Handler with a MULE:COMPOSITE_ROUTE error type.
  1. For Each does not modify the payload, while the Parallel For Each outputs a collection of the output messages from each iteration.

Leave a Comment