Installation and Examples
Requirements
Apache Wayang is built upon the foundations of Java 11 and Scala 2.12, providing a robust and versatile platform for data processing applications. If you intend to build Wayang from source, you wi[...]
Get Wayang
Apache Wayang is readily available through Maven Central, facilitating seamless integration into your development workflow. For instance, to utilize Wayang in your Maven-based project, simply add [...]
<dependency>
<groupId>org.apache.wayang</groupId>
<artifactId>wayang-***</artifactId>
<version>0.7.1</version>
</dependency>
Note the ***: Wayang ships with multiple modules that can be included in your app, depending on how you want to use it:
wayang-core: provides core data structures and the optimizer (required)wayang-basic: provides common operators and data types for your apps (recommended)wayang-api: provides an easy-to-use Scala and Java API to assemble Wayang plans (recommended)wayang-java,wayang-spark,wayang-graphchi,wayang-sqlite3,wayang-postgres: adapters for the various supported processing platformswayang-profiler: provides functionality to learn operator and UDF cost functions from historical execution data
For the sake of version flexibility, you still have to include your Hadoop (hadoop-hdfs and hadoop-common) and Spark (spark-core and spark-graphx) version of choice.
In addition, you can obtain the most recent snapshot version of Wayang via Sonatype's snapshot repository. Just included
<repositories>
<repository>
<id>sonatype-snapshots</id>
<name>Sonatype Snapshot Repository</name>
<url>https://oss.sonatype.org/content/repositories/snapshots</url>
</repository>
<repositories>
If you need to rebuild Wayang, e.g., to use a different Scala version, you can simply do so via Maven:
- Adapt the version variables (e.g.,
spark.version) in the mainpom.xmlfile. - Build Wayang with the adapted versions.
Note the$ mvn clean install
standaloneprofile to fix Hadoop and Spark versions, so that Wayang apps do not explicitly need to declare the corresponding dependencies. Also, note thedistroprofile, which assembles a binary Wayang distribution. To activate these profiles, you need to specify them when running maven, i.e.,mvn clean install -P<profile name>
Configure Wayang
To enable Apache Wayang's smooth operation, you need to equip it with details about your processing platforms' capabilities and how to interact with them. A default configuration is available for [...]
$ java -Dwayang.configuration=url://to/my/wayang.properties ...
Essential configuration settings:
- General settings
wayang.core.log.enabled (= true): whether to log execution statistics to allow learning better cardinality and cost estimators for the optimizerwayang.core.log.executions (= ~/.wayang/executions.json)where to log execution times of operator groupswayang.core.log.cardinalities (= ~/.wayang/cardinalities.json)where to log cardinality measurementswayang.core.optimizer.instrumentation (= org.apache.wayang.core.profiling.OutboundInstrumentationStrategy): where to measure cardinalities in Wayang plans; other options are `org.apache.wa[...]wayang.core.optimizer.reoptimize (= false): whether to progressively optimize Wayang planswayang.basic.tempdir (= file:///tmp): where to store temporary files, in particular for inter-platform communication
- Java Streams
wayang.java.cpu.mhz (= 2700): clock frequency of processor the JVM runs on in MHzwayang.java.hdfs.ms-per-mb (= 2.7): average throughput from HDFS to JVM in ms/MB
- Apache Spark
spark.master (= local): Spark master- various other Spark settings are supported, e.g.,
spark.executor.memory,spark.serializer, ...
- various other Spark settings are supported, e.g.,
wayang.spark.cpu.mhz (= 2700): clock frequency of processor the Spark workers run on in MHzwayang.spark.hdfs.ms-per-mb (= 2.7): average throughput from HDFS to the Spark workers in ms/MBwayang.spark.network.ms-per-mb (= 8.6): average network throughput of the Spark workers in ms/MBwayang.spark.init.ms (= 4500): time it takes Spark to initialize in ms
- GraphChi
wayang.graphchi.cpu.mhz (= 2700): clock frequency of processor GraphChi runs on in MHzwayang.graphchi.cpu.cores (= 2): number of cores GraphChi runs onwayang.graphchi.hdfs.ms-per-mb (= 2.7): average throughput from HDFS to GraphChi in ms/MB
- SQLite
wayang.sqlite3.jdbc.url: JDBC URL to use SQLitewayang.sqlite3.jdbc.user: optional user namewayang.sqlite3.jdbc.password: optional passwordwayang.sqlite3.cpu.mhz (= 2700): clock frequency of processor SQLite runs on in MHzwayang.sqlite3.cpu.cores (= 2): number of cores SQLite runs on
- PostgreSQL
wayang.postgres.jdbc.url: JDBC URL to use PostgreSQLwayang.postgres.jdbc.user: optional user namewayang.postgres.jdbc.password: optional passwordwayang.postgres.cpu.mhz (= 2700): clock frequency of processor PostgreSQL runs on in MHzwayang.postgres.cpu.cores (= 2): number of cores PostgreSQL runs on
To effectively define your applications with Apache Wayang, utilize its Scala or Java API, conveniently found within the wayang-api module. For clear illustrations, refer to the provided exampl[...]
Cost Functions
Wayang provides a utility to learn cost functions from historical execution data. Specifically, Wayang can learn configurations for load profile estimators (that estimate CPU load, disk load etc.[...]
As an example, the JavaMapOperator draws its load profile estimator configuration via the configuration key wayang.java.map.load.
Now, it is possible to specify a load profile estimator template in the configuration under the key <original key>.template, e.g.:
wayang.java.map.load.template = {\
"in":1, "out":1,\
"cpu":"?*in0"\
}
This template encapsulates a load profile estimator that requires at minimum one input cardinality and one output cardinality. Furthermore, it simulates CPU load by assuming a direct relationship[...]
In particular, you can use
- the variables
in0,in1, ... andout0,out1, ... to incorporate the input and output cardinalities, respectively; - operator properties, such as
numIterationsfor thePageRankOperatorimplementations; - the operators
+,-,*,/,%,^, and parantheses; - the functions
min(x0, x1, ...)),max(x0, x1, ...),abs(x),log(x, base),ln(x),ld(x); - and the constants
eandpi.
While Apache Wayang provides templates for all execution operators, you will need to explicitly define your user-defined functions (UDFs) by specifying their cost functions, which are based on co[...] As soon as execution data has been collected, you can initiate:
java ... org.apache.wayang.profiler.ga.GeneticOptimizerApp [configuration URL [execution log]]
This tool will attempt to determine suitable values for the question marks (?) within the load profile estimator templates, aligning them with the collected execution data and pre-defined confi[...]
Examples
For some executable examples, have a look at this repository.
WordCount
Java API
import org.apache.wayang.api.JavaPlanBuilder;
import org.apache.wayang.basic.data.Tuple2;
import org.apache.wayang.core.api.Configuration;
import org.apache.wayang.core.api.WayangContext;
import org.apache.wayang.core.optimizer.cardinality.DefaultCardinalityEstimator;
import org.apache.wayang.java.Java;
import org.apache.wayang.spark.Spark;
import java.util.Collection;
import java.util.Arrays;
public class WordcountJava {
public static void main(String[] args){
// Settings
String inputUrl = "file:/tmp.txt";
// Get a plan builder.
WayangContext wayangContext = new WayangContext(new Configuration())
.withPlugin(Java.basicPlugin())
.withPlugin(Spark.basicPlugin());
JavaPlanBuilder planBuilder = new JavaPlanBuilder(wayangContext)
.withJobName(String.format("WordCount (%s)", inputUrl))
.withUdfJarOf(WordcountJava.class);
// Start building the WayangPlan.
Collection<Tuple2<String, Integer>> wordcounts = planBuilder
// Read the text file.
.readTextFile(inputUrl).withName("Load file")
// Split each line by non-word characters.
.flatMap(line -> Arrays.asList(line.split("\\W+")))
.withSelectivity(10, 100, 0.9)
.withName("Split words")
// Filter empty tokens.
.filter(token -> !token.isEmpty())
.withSelectivity(0.99, 0.99, 0.99)
.withName("Filter empty words")
// Attach counter to each word.
.map(word -> new Tuple2<>(word.toLowerCase(), 1)).withName("To lower case, add counter")
// Sum up counters for every word.
.reduceByKey(
Tuple2::getField0,
(t1, t2) -> new Tuple2<>(t1.getField0(), t1.getField1() + t2.getField1())
)
.withCardinalityEstimator(new DefaultCardinalityEstimator(0.9, 1, false, in -> Math.round(0.01 * in[0])))
.withName("Add counters")
// Execute the plan and collect the results.
.collect();
System.out.println(wordcounts);
}
}
Scala API
import org.apache.wayang.api._
import org.apache.wayang.core.api.{Configuration, WayangContext}
import org.apache.wayang.java.Java
import org.apache.wayang.spark.Spark
object WordcountScala {
def main(args: Array[String]) {
// Settings
val inputUrl = "file:/tmp.txt"
// Get a plan builder.
val wayangContext = new WayangContext(new Configuration)
.withPlugin(Java.basicPlugin)
.withPlugin(Spark.basicPlugin)
val planBuilder = new PlanBuilder(wayangContext)
.withJobName(s"WordCount ($inputUrl)")
.withUdfJarsOf(this.getClass)
val wordcounts = planBuilder
// Read the text file.
.readTextFile(inputUrl).withName("Load file")
// Split each line by non-word characters.
.flatMap(_.split("\\W+"), selectivity = 10).withName("Split words")
// Filter empty tokens.
.filter(_.nonEmpty, selectivity = 0.99).withName("Filter empty words")
// Attach counter to each word.
.map(word => (word.toLowerCase, 1)).withName("To lower case, add counter")
// Sum up counters for every word.
.reduceByKey(_._1, (c1, c2) => (c1._1, c1._2 + c2._2)).withName("Add counters")
.withCardinalityEstimator((in: Long) => math.round(in * 0.01))
// Execute the plan and collect the results.
.collect()
println(wordcounts)
}
}
k-means
Wayang is also capable of iterative processing, which is, e.g., very important for machine learning algorithms, such as k-means.
Scala API
import org.apache.wayang.api._
import org.apache.wayang.core.api.{Configuration, WayangContext}
import org.apache.wayang.core.function.FunctionDescriptor.ExtendedSerializableFunction
import org.apache.wayang.core.function.ExecutionContext
import org.apache.wayang.core.optimizer.costs.LoadProfileEstimators
import org.apache.wayang.java.Java
import org.apache.wayang.spark.Spark
import scala.util.Random
import scala.collection.JavaConversions._
object kmeans {
def main(args: Array[String]) {
// Settings
val inputUrl = "file:/kmeans.txt"
val k = 5
val iterations = 100
val configuration = new Configuration
// Get a plan builder.
val wayangContext = new WayangContext(new Configuration)
.withPlugin(Java.basicPlugin)
.withPlugin(Spark.basicPlugin)
val planBuilder = new PlanBuilder(wayangContext)
.withJobName(s"k-means ($inputUrl, k=$k, $iterations iterations)")
.withUdfJarsOf(this.getClass)
case class Point(x: Double, y: Double)
case class TaggedPoint(x: Double, y: Double, cluster: Int)
case class TaggedPointCounter(x: Double, y: Double, cluster: Int, count: Long) {
def add_points(that: TaggedPointCounter) = TaggedPointCounter(this.x + that.x, this.y + that.y, this.cluster, this.count + that.count)
def average = TaggedPointCounter(x / count, y / count, cluster, 0)
}
// Read and parse the input file(s).
val points = planBuilder
.readTextFile(inputUrl).withName("Read file")
.map { line =>
val fields = line.split(",")
Point(fields(0).toDouble, fields(1).toDouble)
}.withName("Create points")
// Create initial centroids.
val random = new Random
val initialCentroids = planBuilder
.loadCollection(for (i <- 1 to k) yield TaggedPointCounter(random.nextGaussian(), random.nextGaussian(), i, 0)).withName("Load random centroids")
// Declare UDF to select centroid for each data point.
class SelectNearestCentroid extends ExtendedSerializableFunction[Point, TaggedPointCounter] {
/** Keeps the broadcasted centroids. */
var centroids: Iterable[TaggedPointCounter] = _
override def open(executionCtx: ExecutionContext) = {
centroids = executionCtx.getBroadcast[TaggedPointCounter]("centroids")
}
override def apply(point: Point): TaggedPointCounter = {
var minDistance = Double.PositiveInfinity
var nearestCentroidId = -1
for (centroid <- centroids) {
val distance = Math.pow(Math.pow(point.x - centroid.x, 2) + Math.pow(point.y - centroid.y, 2), 0.5)
if (distance < minDistance) {
minDistance = distance
nearestCentroidId = centroid.cluster
}
}
new TaggedPointCounter(point.x, point.y, nearestCentroidId, 1)
}
}
// Do the k-means loop.
val finalCentroids = initialCentroids.repeat(iterations, { currentCentroids =>
points
.mapJava(new SelectNearestCentroid,
udfLoad = LoadProfileEstimators.createFromSpecification(
"my.udf.costfunction.key", configuration
))
.withBroadcast(currentCentroids, "centroids").withName("Find nearest centroid")
.reduceByKey(_.cluster, _.add_points(_)).withName("Add up points")
.withCardinalityEstimator(k)
.map(_.average).withName("Average points")
}).withName("Loop")
// Collect the results.
.collect()
println(finalCentroids)
}
}