Senin, 07 Juli 2014

Distributed Natural Language Parsing using GridGain as Compute and Data Grid

Discussion in OpenCog group about AtomSpace architecture. Dr. Ben Goertzel notes:
Section 5.3 of my distributed AtomSpace design from June 2012

is titled "Importance Dynamics" and deals with problem of handling STI and LTI (attention) values in a distributed OpenCog system....  It is brief and only gives a general approach, as I figured it would be best to work out the details after the distributed Atomspace was in the detailed design phase.  Recall that document was written after long discussions with you and others. 
I've been experimenting with GridGain and it seems to be ticking most if not all of the performance requirements you need, plus with Neo4j as the persistent graph store which would allow intuitive querying and visual exploring of the AtomSpace.

Currently I have 34 rules (imagine that this is the number of Atoms). The core to process them is: (Java8)

Collection<GridFuture<MatchedYagoRule>> matchers = Collections2.transform(ruleIds, (ruleId) ->
grid.compute().affinityCall(, ruleId, 
new GridCallable<MatchedYagoRule>() {
public MatchedYagoRule call()
throws Exception {
final YagoRule rule = cache.get(ruleId);
Pattern pattern = Pattern.compile(rule.questionPattern_en, Pattern.CASE_INSENSITIVE);
Matcher matcher = pattern.matcher(msg);
if (matcher.matches()) {"MATCH {} Processing rule #{} {}", matcher, ruleId,;
return new MatchedYagoRule(rule,"subject"));
} else {"not match Processing rule #{} {}", ruleId,;
return null;
}) );

which probably needs explanation for someone unfamiliar with in-memory datagrid, but the whole experiment does very sophisticated things for very little code / setup, and it will be scalable (I can only find this 2010 article for comparison benchmark, but I'm sure today GridGain is much improved).

How it works is it distributes the compute task (triggered by node2) for 34 rules across nodes and threads (cores) inside each node. For this example I use 2 nodes in the same machine, the output for node2 is:

06:18:46.470 [gridgain-#5%pub-null%] INFO  i.a.i.e.l.l.yago.AnswerYagoFactTests - not match Processing rule hasHeight How tall is (?<subject>.+)\?
06:18:46.473 [gridgain-#7%pub-null%] INFO  i.a.i.e.l.l.yago.AnswerYagoFactTests - not match Processing rule hasEconomicGrowth How much is the economic growth of (?<subject>.+)\?
06:18:46.477 [gridgain-#6%pub-null%] INFO  i.a.i.e.l.l.yago.AnswerYagoFactTests - not match Processing rule isMarriedTo Who did (?<subject>.+) marry\?
06:18:46.485 [gridgain-#10%pub-null%] INFO  i.a.i.e.l.l.yago.AnswerYagoFactTests - Found matcher: MatchedYagoRule [rule=YagoRule [property=wasBornIn, questionPattern_en=Where was (?<subject>.+) born\?, questionPattern_id=Di mana (?<subject>.+) dilahirkan\?, answerTemplateHtml_en={{subject}} was born in {{object}}., answerTemplateHtml_id={{subject}} lahir di {{object}}.], subject=Michael Jackson]
06:18:46.485 [gridgain-#10%pub-null%] INFO  i.a.i.e.l.l.yago.AnswerYagoFactTests - Subject: MatchedYagoRule [rule=YagoRule [property=wasBornIn, questionPattern_en=Where was (?<subject>.+) born\?, questionPattern_id=Di mana (?<subject>.+) dilahirkan\?, answerTemplateHtml_en={{subject}} was born in {{object}}., answerTemplateHtml_id={{subject}} lahir di {{object}}.], subject=Michael Jackson]
[06:18:46] GridGain node stopped OK [uptime=00:00:00:989]

However the match actually didn't happen in node2, it actually happened in node1:

06:18:46.436 [gridgain-#7%pub-null%] INFO  i.a.i.e.l.l.yago.AnswerYagoFactTests - MATCH java.util.regex.Matcher[pattern=Where was (?<subject>.+) born\? region=0,31 lastmatch=Where was Michael Jackson born?] Processing rule wasBornIn Where was (?<subject>.+) born\?

node1 and node2 holds different partitions of the 34 rules. So what happens is node2 as that triggers the job, will distribute the job (literally sending the Java closure bytecode over network) to other nodes, based on affinity to the requested rule. node1 will process that closure/job over entries/rules that it holds. In my example node2 also does the same, since it also holds a partition of the rules, but it doesn't have. All jobs send the result (map), which will then be reduced, and we get the output.

Also, the rules are held in persistent storage, which in my simple case is actually from CSV file. In reality this would be a data store such as Neo4j or PostgreSQL or Cassandra. Meaning that the maximum AtomSpace capacity is equal to sum of harddrives (depending on replication factor). And during processing, each node's RAM will be utilized to process the data it has closest/based on affinity.

We get several nice properties:
  • distribution/partitioning of data, which means:
  • increased storage space, and
  • distribution of compute, which is enabled by
  • affinity-based computation, i.e. a node processes request based on the atoms it already has
  • with pluggable persistent storage, the "API" so-to-speak to process atoms remain the same, even if we process 100 GB of (total) atoms with only 4 GB of (total) RAM. since GridGain will manage the read-through & write-through based on the GridCacheStore implementation
  • GridGain allow indexes on data (which work in-memory), if used can complement the datastore indexes and provide flexible querying (i.e. other than fetching keys) while retaining performance
  • GridGain is Apache Licensed, with commercial support :) companies using OpenCog have option of GridGain's consulting & commercial support to tune their system